This is an automated email from the ASF dual-hosted git repository.

clolov pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cbd72cc216e KAFKA-14121: AlterPartitionReassignments API should allow 
callers to specify the option of preserving the replication factor (#18983)
cbd72cc216e is described below

commit cbd72cc216ec0a2f3fb427a859fefded79f8059c
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Wed Mar 5 19:23:12 2025 +0800

    KAFKA-14121: AlterPartitionReassignments API should allow callers to 
specify the option of preserving the replication factor (#18983)
    
    Reviewers: Christo Lolov <[email protected]>, Chia-Ping Tsai 
<[email protected]>, TengYao Chi <[email protected]>
---
 .../java/org/apache/kafka/clients/admin/Admin.java |   7 ++
 .../admin/AlterPartitionReassignmentsOptions.java  |  21 ++++
 .../kafka/clients/admin/KafkaAdminClient.java      |   1 +
 .../AlterPartitionReassignmentsRequest.java        |   6 +
 .../AlterPartitionReassignmentsRequest.json        |   5 +-
 .../AlterPartitionReassignmentsResponse.json       |   5 +-
 .../kafka/api/PlaintextAdminIntegrationTest.scala  |  19 ++-
 .../controller/ReplicationControlManager.java      |  42 ++++++-
 .../controller/ReplicationControlManagerTest.java  | 129 +++++++++++++++++++++
 .../tools/reassign/ReassignPartitionsCommand.java  |  26 +++--
 .../reassign/ReassignPartitionsCommandOptions.java |   2 +
 .../tools/other/ReplicationQuotasTestRig.java      |   2 +-
 .../reassign/ReassignPartitionsCommandTest.java    |  20 +++-
 .../tools/reassign/ReassignPartitionsUnitTest.java |  10 +-
 14 files changed, 270 insertions(+), 25 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index dcffef60d09..a0bb85899f8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -1083,6 +1083,13 @@ public interface Admin extends AutoCloseable {
      *   if the request timed out before the controller could record the new 
assignments.</li>
      *   <li>{@link 
org.apache.kafka.common.errors.InvalidReplicaAssignmentException}
      *   If the specified assignment was not valid.</li>
+     *   <li>{@link 
org.apache.kafka.common.errors.InvalidReplicationFactorException}
+     *   If the replication factor was changed in an invalid way.
+     *   Only thrown when {@link 
AlterPartitionReassignmentsOptions#allowReplicationFactorChange()} is set to 
false and
+     *   the request is attempting to alter reassignments (not cancel)</li>
+     *   <li>{@link org.apache.kafka.common.errors.UnsupportedVersionException}
+     *   If {@link 
AlterPartitionReassignmentsOptions#allowReplicationFactorChange()} was changed 
outside the default
+     *   and the server does not support the option (e.g due to an old Kafka 
version).</li>
      *   <li>{@link 
org.apache.kafka.common.errors.NoReassignmentInProgressException}
      *   If there was an attempt to cancel a reassignment for a partition 
which was not being reassigned.</li>
      * </ul>
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/AlterPartitionReassignmentsOptions.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/AlterPartitionReassignmentsOptions.java
index 166e90404c3..74c9f3dcdec 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/AlterPartitionReassignmentsOptions.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/AlterPartitionReassignmentsOptions.java
@@ -23,4 +23,25 @@ import java.util.Map;
  * Options for {@link AdminClient#alterPartitionReassignments(Map, 
AlterPartitionReassignmentsOptions)}
  */
 public class AlterPartitionReassignmentsOptions extends 
AbstractOptions<AlterPartitionReassignmentsOptions> {
+
+    private boolean allowReplicationFactorChange = true;
+
+    /**
+     * Set the option indicating if the alter partition reassignments call 
should be
+     * allowed to alter the replication factor of a partition.
+     * In cases where it is not allowed, any replication factor change will 
result in an exception thrown by the API.
+     */
+    public AlterPartitionReassignmentsOptions 
allowReplicationFactorChange(boolean allow) {
+        this.allowReplicationFactorChange = allow;
+        return this;
+    }
+
+    /**
+     * A boolean indicating if the alter partition reassignments should be
+     * allowed to alter the replication factor of a partition.
+     * In cases where it is not allowed, any replication factor change will 
result in an exception thrown by the API.
+     */
+    public boolean allowReplicationFactorChange() {
+        return this.allowReplicationFactorChange;
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 725e48c3656..8432ec754a0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -3944,6 +3944,7 @@ public class KafkaAdminClient extends AdminClient {
                     data.topics().add(reassignableTopic);
                 }
                 data.setTimeoutMs(timeoutMs);
+                
data.setAllowReplicationFactorChange(options.allowReplicationFactorChange());
                 return new AlterPartitionReassignmentsRequest.Builder(data);
             }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsRequest.java
index 2d289cc1497..8032207943a 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsRequest.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
 import 
org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignableTopic;
 import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
@@ -42,6 +43,11 @@ public class AlterPartitionReassignmentsRequest extends 
AbstractRequest {
 
         @Override
         public AlterPartitionReassignmentsRequest build(short version) {
+            if (!data.allowReplicationFactorChange() && version < 1) {
+                throw new UnsupportedVersionException("The broker does not 
support the AllowReplicationFactorChange " +
+                        "option for the AlterPartitionReassignments API. 
Consider re-sending the request without the " +
+                        "option or updating the server version");
+            }
             return new AlterPartitionReassignmentsRequest(data, version);
         }
 
diff --git 
a/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json
 
b/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json
index f3047feb0a3..d0ccc1c088e 100644
--- 
a/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json
+++ 
b/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json
@@ -18,11 +18,14 @@
   "type": "request",
   "listeners": ["broker", "controller"],
   "name": "AlterPartitionReassignmentsRequest",
-  "validVersions": "0",
+  // Version 1 adds the ability to allow/disallow changing the replication 
factor as part of the request.
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "TimeoutMs", "type": "int32", "versions": "0+", "default": 
"60000",
       "about": "The time in ms to wait for the request to complete." },
+    { "name": "AllowReplicationFactorChange", "type": "bool", "versions": 
"1+", "default": "true",
+      "about": "The option indicating whether changing the replication factor 
of any given partition as part of this request is a valid move." },
     { "name": "Topics", "type": "[]ReassignableTopic", "versions": "0+",
       "about": "The topics to reassign.", "fields": [
       { "name": "Name", "type": "string", "versions": "0+", "entityType": 
"topicName",
diff --git 
a/clients/src/main/resources/common/message/AlterPartitionReassignmentsResponse.json
 
b/clients/src/main/resources/common/message/AlterPartitionReassignmentsResponse.json
index 0b8f60b0bab..36ce87968ec 100644
--- 
a/clients/src/main/resources/common/message/AlterPartitionReassignmentsResponse.json
+++ 
b/clients/src/main/resources/common/message/AlterPartitionReassignmentsResponse.json
@@ -17,11 +17,14 @@
   "apiKey": 45,
   "type": "response",
   "name": "AlterPartitionReassignmentsResponse",
-  "validVersions": "0",
+  // Version 1 adds the ability to allow/disallow changing the replication 
factor as part of the request.
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
       "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
+    { "name": "AllowReplicationFactorChange", "type": "bool", "versions": 
"1+", "default": "true", "ignorable": true,
+      "about": "The option indicating whether changing the replication factor 
of any given partition as part of the request was allowed." },
     { "name": "ErrorCode", "type": "int16", "versions": "0+",
       "about": "The top-level error code, or 0 if there was no error." },
     { "name": "ErrorMessage", "type": "string", "versions": "0+", 
"nullableVersions": "0+",
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index b57f4cd722c..18c5c2bbd05 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -3452,13 +3452,28 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     val tp1 = new TopicPartition(topic, 0)
     val tp2 = new TopicPartition(topic, 1)
     val tp3 = new TopicPartition(topic, 2)
-    createTopic(topic, numPartitions = 4)
-
+    createTopic(topic, numPartitions = 4, replicationFactor = 2)
 
     val validAssignment = Optional.of(new NewPartitionReassignment(
       (0 until brokerCount).map(_.asInstanceOf[Integer]).asJava
     ))
 
+    val alterOptions = new AlterPartitionReassignmentsOptions
+    alterOptions.allowReplicationFactorChange(false)
+    val alterReplicaNumberTo1 = Optional.of(new 
NewPartitionReassignment(List(1.asInstanceOf[Integer]).asJava))
+    val alterReplicaNumberTo2 = Optional.of(new NewPartitionReassignment((0 
until brokerCount - 1).map(_.asInstanceOf[Integer]).asJava))
+    val alterReplicaNumberTo3 = Optional.of(new NewPartitionReassignment((0 
until brokerCount).map(_.asInstanceOf[Integer]).asJava))
+    val alterReplicaResults = client.alterPartitionReassignments(Map(
+      tp1 -> alterReplicaNumberTo1,
+      tp2 -> alterReplicaNumberTo2,
+      tp3 -> alterReplicaNumberTo3,
+    ).asJava, alterOptions).values()
+    assertDoesNotThrow(() => alterReplicaResults.get(tp2).get())
+    assertEquals("The replication factor is changed from 2 to 1",
+      assertFutureThrows(classOf[InvalidReplicationFactorException], 
alterReplicaResults.get(tp1)).getMessage)
+    assertEquals("The replication factor is changed from 2 to 3",
+      assertFutureThrows(classOf[InvalidReplicationFactorException], 
alterReplicaResults.get(tp3)).getMessage)
+
     val nonExistentTp1 = new TopicPartition("topicA", 0)
     val nonExistentTp2 = new TopicPartition(topic, 4)
     val nonExistentPartitionsResult = client.alterPartitionReassignments(Map(
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index b5d4993f90b..d4c613003c3 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -2058,8 +2058,10 @@ public class ReplicationControlManager {
     ControllerResult<AlterPartitionReassignmentsResponseData>
             alterPartitionReassignments(AlterPartitionReassignmentsRequestData 
request) {
         List<ApiMessageAndVersion> records = 
BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
+        boolean allowRFChange = request.allowReplicationFactorChange();
         AlterPartitionReassignmentsResponseData result =
-                new 
AlterPartitionReassignmentsResponseData().setErrorMessage(null);
+                new 
AlterPartitionReassignmentsResponseData().setErrorMessage(null)
+                        .setAllowReplicationFactorChange(allowRFChange);
         int successfulAlterations = 0, totalAlterations = 0;
         for (ReassignableTopic topic : request.topics()) {
             ReassignableTopicResponse topicResponse = new 
ReassignableTopicResponse().
@@ -2067,7 +2069,7 @@ public class ReplicationControlManager {
             for (ReassignablePartition partition : topic.partitions()) {
                 ApiError error = ApiError.NONE;
                 try {
-                    alterPartitionReassignment(topic.name(), partition, 
records);
+                    alterPartitionReassignment(topic.name(), partition, 
records, allowRFChange);
                     successfulAlterations++;
                 } catch (Throwable e) {
                     log.info("Unable to alter partition reassignment for " +
@@ -2090,7 +2092,8 @@ public class ReplicationControlManager {
 
     void alterPartitionReassignment(String topicName,
                                     ReassignablePartition target,
-                                    List<ApiMessageAndVersion> records) {
+                                    List<ApiMessageAndVersion> records,
+                                    boolean allowRFChange) {
         Uuid topicId = topicsByName.get(topicName);
         if (topicId == null) {
             throw new UnknownTopicOrPartitionException("Unable to find a topic 
" +
@@ -2111,7 +2114,7 @@ public class ReplicationControlManager {
         if (target.replicas() == null) {
             record = cancelPartitionReassignment(topicName, tp, part);
         } else {
-            record = changePartitionReassignment(tp, part, target);
+            record = changePartitionReassignment(tp, part, target, 
allowRFChange);
         }
         record.ifPresent(records::add);
     }
@@ -2175,18 +2178,23 @@ public class ReplicationControlManager {
      * @param tp                The topic id and partition id.
      * @param part              The existing partition info.
      * @param target            The target partition info.
+     * @param allowRFChange     Validate if partition replication factor can 
change. KIP-860
      *
      * @return                  The ChangePartitionRecord for the new 
partition assignment,
      *                          or empty if no change is needed.
      */
     Optional<ApiMessageAndVersion> 
changePartitionReassignment(TopicIdPartition tp,
                                                                
PartitionRegistration part,
-                                                               
ReassignablePartition target) {
+                                                               
ReassignablePartition target,
+                                                               boolean 
allowRFChange) {
         // Check that the requested partition assignment is valid.
         PartitionAssignment currentAssignment = new 
PartitionAssignment(Replicas.toList(part.replicas), part::directory);
         PartitionAssignment targetAssignment = new 
PartitionAssignment(target.replicas(), clusterDescriber);
 
         validateManualPartitionAssignment(targetAssignment, 
OptionalInt.empty());
+        if (!allowRFChange) {
+            validatePartitionReplicationFactorUnchanged(part, target);
+        }
 
         List<Integer> currentReplicas = Replicas.toList(part.replicas);
         PartitionReassignmentReplicas reassignment =
@@ -2406,6 +2414,30 @@ public class ReplicationControlManager {
             newPartInfo.elr);
     }
 
+    private void 
validatePartitionReplicationFactorUnchanged(PartitionRegistration part,
+                                                             
ReassignablePartition target) {
+        int currentReassignmentSetSize;
+        if (isReassignmentInProgress(part)) {
+            Set<Integer> set = new HashSet<>();
+            for (int r : part.replicas) {
+                set.add(r);
+            }
+            for (int r : part.addingReplicas) {
+                set.add(r);
+            }
+            for (int r : part.removingReplicas) {
+                set.remove(r);
+            }
+            currentReassignmentSetSize = set.size();
+        } else {
+            currentReassignmentSetSize = part.replicas.length;
+        }
+        if (currentReassignmentSetSize != target.replicas().size()) {
+            throw new InvalidReplicationFactorException("The replication 
factor is changed from " +
+                    currentReassignmentSetSize + " to " + 
target.replicas().size());
+        }
+    }
+
     private static final class IneligibleReplica {
         private final int replicaId;
         private final String reason;
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index ef58e2bc7f5..0516b134c78 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -1922,6 +1922,135 @@ public class ReplicationControlManagerTest {
         assertEquals(NONE_REASSIGNING, 
replication.listPartitionReassignments(null, Long.MAX_VALUE));
     }
 
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
+    public void testAlterPartitionDisallowReplicationFactorChange(short 
version) {
+        MetadataVersion metadataVersion = MetadataVersion.latestTesting();
+        ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder()
+                .setMetadataVersion(metadataVersion)
+                .build();
+        ReplicationControlManager replication = ctx.replicationControl;
+        ctx.registerBrokers(0, 1, 2, 3);
+        ctx.unfenceBrokers(0, 1, 2, 3);
+        ctx.createTestTopic("foo", new int[][] {new int[] {0, 1, 2}, new int[] 
{0, 1, 2}, new int[] {0, 1, 2}});
+
+        ControllerResult<AlterPartitionReassignmentsResponseData> alterResult =
+                replication.alterPartitionReassignments(
+                        new 
AlterPartitionReassignmentsRequestData().setTopics(singletonList(
+                                new 
ReassignableTopic().setName("foo").setPartitions(asList(
+                                        new 
ReassignablePartition().setPartitionIndex(0).
+                                                setReplicas(asList(1, 2, 3)),
+                                        new 
ReassignablePartition().setPartitionIndex(1).
+                                                setReplicas(asList(0, 1)),
+                                        new 
ReassignablePartition().setPartitionIndex(2).
+                                                setReplicas(asList(0, 1, 2, 
3)))))).
+                                setAllowReplicationFactorChange(false));
+        assertEquals(new AlterPartitionReassignmentsResponseData().
+                        
setErrorMessage(null).setAllowReplicationFactorChange(false).setResponses(singletonList(
+                                new 
ReassignableTopicResponse().setName("foo").setPartitions(asList(
+                                        new 
ReassignablePartitionResponse().setPartitionIndex(0).
+                                                setErrorMessage(null),
+                                        new 
ReassignablePartitionResponse().setPartitionIndex(1).
+                                                
setErrorCode(INVALID_REPLICATION_FACTOR.code()).
+                                                setErrorMessage("The 
replication factor is changed from 3 to 2"),
+                                        new 
ReassignablePartitionResponse().setPartitionIndex(2).
+                                                
setErrorCode(INVALID_REPLICATION_FACTOR.code()).
+                                                setErrorMessage("The 
replication factor is changed from 3 to 4"))))),
+                alterResult.response());
+        ctx.replay(alterResult.records());
+        ListPartitionReassignmentsResponseData currentReassigning =
+                new 
ListPartitionReassignmentsResponseData().setErrorMessage(null).
+                        setTopics(singletonList(new OngoingTopicReassignment().
+                                setName("foo").setPartitions(singletonList(
+                                        new 
OngoingPartitionReassignment().setPartitionIndex(0).
+                                                
setRemovingReplicas(singletonList(0)).
+                                                
setAddingReplicas(singletonList(3)).
+                                                setReplicas(asList(1, 2, 3, 
0))))));
+        assertEquals(currentReassigning, 
replication.listPartitionReassignments(singletonList(
+                new ListPartitionReassignmentsTopics().setName("foo").
+                        setPartitionIndexes(asList(0, 1, 2))), 
Long.MAX_VALUE));
+
+        // test alter replica factor not allow to change when partition 
reassignment is ongoing
+        ControllerResult<AlterPartitionReassignmentsResponseData> 
alterReassigningResult =
+                replication.alterPartitionReassignments(
+                        new 
AlterPartitionReassignmentsRequestData().setTopics(singletonList(
+                                new 
ReassignableTopic().setName("foo").setPartitions(singletonList(
+                                        new 
ReassignablePartition().setPartitionIndex(0).setReplicas(asList(0, 1)))))).
+                                setAllowReplicationFactorChange(false));
+        assertEquals(new AlterPartitionReassignmentsResponseData().
+                        
setErrorMessage(null).setAllowReplicationFactorChange(false).setResponses(singletonList(
+                                new 
ReassignableTopicResponse().setName("foo").setPartitions(singletonList(
+                                        new 
ReassignablePartitionResponse().setPartitionIndex(0).
+                                                
setErrorCode(INVALID_REPLICATION_FACTOR.code()).
+                                                setErrorMessage("The 
replication factor is changed from 3 to 2"))))),
+                alterReassigningResult.response());
+
+        ControllerResult<AlterPartitionReassignmentsResponseData> 
alterReassigningResult2 =
+                replication.alterPartitionReassignments(
+                        new 
AlterPartitionReassignmentsRequestData().setTopics(singletonList(
+                                        new 
ReassignableTopic().setName("foo").setPartitions(singletonList(
+                                                new 
ReassignablePartition().setPartitionIndex(0).setReplicas(asList(0, 2, 3)))))).
+                                setAllowReplicationFactorChange(false));
+        assertEquals(new AlterPartitionReassignmentsResponseData().
+                        
setErrorMessage(null).setAllowReplicationFactorChange(false).setResponses(singletonList(
+                                new 
ReassignableTopicResponse().setName("foo").setPartitions(singletonList(
+                                        new 
ReassignablePartitionResponse().setPartitionIndex(0).
+                                                setErrorMessage(null))))),
+                alterReassigningResult2.response());
+    }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
+    public void 
testDisallowReplicationFactorChangeNoEffectWhenCancelAlterPartition(short 
version) {
+        MetadataVersion metadataVersion = MetadataVersion.latestTesting();
+        ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder()
+                .setMetadataVersion(metadataVersion)
+                .build();
+        ReplicationControlManager replication = ctx.replicationControl;
+        ctx.registerBrokers(0, 1, 2, 3);
+        ctx.unfenceBrokers(0, 1, 2, 3);
+        ctx.createTestTopic("foo", new int[][] {new int[] {0, 1, 
2}}).topicId();
+
+        ControllerResult<AlterPartitionReassignmentsResponseData> alterResult =
+                replication.alterPartitionReassignments(
+                        new 
AlterPartitionReassignmentsRequestData().setTopics(singletonList(
+                                        new 
ReassignableTopic().setName("foo").setPartitions(singletonList(
+                                                new 
ReassignablePartition().setPartitionIndex(0).
+                                                        setReplicas(asList(1, 
2, 3)))))));
+        assertEquals(new AlterPartitionReassignmentsResponseData().
+                        setErrorMessage(null).setResponses(singletonList(
+                                new 
ReassignableTopicResponse().setName("foo").setPartitions(singletonList(
+                                        new 
ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null))))),
+                alterResult.response());
+        ctx.replay(alterResult.records());
+        ListPartitionReassignmentsResponseData currentReassigning =
+                new 
ListPartitionReassignmentsResponseData().setErrorMessage(null).
+                        setTopics(singletonList(new OngoingTopicReassignment().
+                                setName("foo").setPartitions(singletonList(
+                                        new 
OngoingPartitionReassignment().setPartitionIndex(0).
+                                                
setRemovingReplicas(singletonList(0)).
+                                                
setAddingReplicas(singletonList(3)).
+                                                setReplicas(asList(1, 2, 3, 
0))))));
+        assertEquals(currentReassigning, 
replication.listPartitionReassignments(singletonList(
+                new ListPartitionReassignmentsTopics().setName("foo").
+                        setPartitionIndexes(asList(0, 1, 2))), 
Long.MAX_VALUE));
+
+        // test replica factor change check takes no effect when partition 
reassignment is ongoing
+        ControllerResult<AlterPartitionReassignmentsResponseData> cancelResult 
=
+                replication.alterPartitionReassignments(
+                        new 
AlterPartitionReassignmentsRequestData().setTopics(singletonList(
+                                new 
ReassignableTopic().setName("foo").setPartitions(singletonList(
+                                        new 
ReassignablePartition().setPartitionIndex(0).setReplicas(null))))).
+                                setAllowReplicationFactorChange(false));
+        assertEquals(new 
AlterPartitionReassignmentsResponseData().setAllowReplicationFactorChange(false).setErrorMessage(null).
+                        setResponses(singletonList(
+                                new 
ReassignableTopicResponse().setName("foo").setPartitions(singletonList(
+                                        new 
ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null))))),
+                cancelResult.response());
+        ctx.replay(cancelResult.records());
+        assertEquals(NONE_REASSIGNING, 
replication.listPartitionReassignments(null, Long.MAX_VALUE));
+    }
+
     @ParameterizedTest
     @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
     public void testAlterPartitionShouldRejectFencedBrokers(short version) {
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
 
b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
index 137092da32c..33bf23d13d1 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
@@ -20,6 +20,7 @@ import org.apache.kafka.admin.BrokerMetadata;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.AlterPartitionReassignmentsOptions;
 import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult;
 import org.apache.kafka.clients.admin.NewPartitionReassignment;
@@ -182,7 +183,8 @@ public class ReassignPartitionsCommand {
                 opts.options.valueOf(opts.interBrokerThrottleOpt),
                 opts.options.valueOf(opts.replicaAlterLogDirsThrottleOpt),
                 opts.options.valueOf(opts.timeoutOpt),
-                Time.SYSTEM);
+                Time.SYSTEM,
+                opts.options.has(opts.disallowReplicationFactorChangeOpt));
         } else if (opts.options.has(opts.cancelOpt)) {
             cancelAssignment(adminClient,
                 
Utils.readFileAsString(opts.options.valueOf(opts.reassignmentJsonFileOpt)),
@@ -761,7 +763,8 @@ public class ReassignPartitionsCommand {
                                   Long interBrokerThrottle,
                                   Long logDirThrottle,
                                   Long timeoutMs,
-                                  Time time
+                                  Time time,
+                                  boolean disallowReplicationFactorChange
     ) throws ExecutionException, InterruptedException, 
JsonProcessingException, TerseException {
         Entry<Map<TopicPartition, List<Integer>>, Map<TopicPartitionReplica, 
String>> t0 = parseExecuteAssignmentArgs(reassignmentJson);
 
@@ -796,7 +799,7 @@ public class ReassignPartitionsCommand {
         }
 
         // Execute the partition reassignments.
-        Map<TopicPartition, Throwable> errors = 
alterPartitionReassignments(adminClient, proposedParts);
+        Map<TopicPartition, Throwable> errors = 
alterPartitionReassignments(adminClient, proposedParts, 
disallowReplicationFactorChange);
         if (!errors.isEmpty()) {
             throw new TerseException(
                 String.format("Error reassigning partition(s):%n%s",
@@ -941,15 +944,19 @@ public class ReassignPartitionsCommand {
     /**
      * Execute the given partition reassignments.
      *
-     * @param adminClient       The admin client object to use.
-     * @param reassignments     A map from topic names to target replica 
assignments.
-     * @return                  A map from partition objects to error strings.
+     * @param adminClient                        The admin client object to 
use.
+     * @param reassignments                      A map from topic names to 
target replica assignments.
+     * @param disallowReplicationFactorChange    Disallow replication factor 
change or not.
+     * @return                                   A map from partition objects 
to error strings.
      */
     static Map<TopicPartition, Throwable> alterPartitionReassignments(Admin 
adminClient,
-                                                                      
Map<TopicPartition, List<Integer>> reassignments) throws InterruptedException {
+                                                                      
Map<TopicPartition, List<Integer>> reassignments,
+                                                                      boolean 
disallowReplicationFactorChange) throws InterruptedException {
         Map<TopicPartition, Optional<NewPartitionReassignment>> args = new 
HashMap<>();
         reassignments.forEach((part, replicas) -> args.put(part, 
Optional.of(new NewPartitionReassignment(replicas))));
-        Map<TopicPartition, KafkaFuture<Void>> results = 
adminClient.alterPartitionReassignments(args).values();
+        AlterPartitionReassignmentsOptions options = new 
AlterPartitionReassignmentsOptions();
+        options.allowReplicationFactorChange(!disallowReplicationFactorChange);
+        Map<TopicPartition, KafkaFuture<Void>> results = 
adminClient.alterPartitionReassignments(args, options).values();
         Map<TopicPartition, Throwable> errors = new HashMap<>();
         for (Entry<TopicPartition, KafkaFuture<Void>> e :  results.entrySet()) 
{
             try {
@@ -1485,7 +1492,8 @@ public class ReassignPartitionsCommand {
             opts.commandConfigOpt,
             opts.interBrokerThrottleOpt,
             opts.replicaAlterLogDirsThrottleOpt,
-            opts.timeoutOpt
+            opts.timeoutOpt,
+            opts.disallowReplicationFactorChangeOpt
         ));
         permittedArgs.put(opts.cancelOpt, Arrays.asList(
             isBootstrapServer ? opts.bootstrapServerOpt : 
opts.bootstrapControllerOpt,
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java
 
b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java
index 2d31c5a902a..39541288712 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java
@@ -42,6 +42,7 @@ public class ReassignPartitionsCommandOptions extends 
CommandDefaultOptions {
     final OptionSpec<Long> timeoutOpt;
     final OptionSpec<?> additionalOpt;
     final OptionSpec<?> preserveThrottlesOpt;
+    final OptionSpec<?> disallowReplicationFactorChangeOpt;
 
     public ReassignPartitionsCommandOptions(String[] args) {
         super(args);
@@ -115,6 +116,7 @@ public class ReassignPartitionsCommandOptions extends 
CommandDefaultOptions {
         additionalOpt = parser.accepts("additional", "Execute this 
reassignment in addition to any " +
             "other ongoing ones. This option can also be used to change the 
throttle of an ongoing reassignment.");
         preserveThrottlesOpt = parser.accepts("preserve-throttles", "Do not 
modify broker or topic throttles.");
+        disallowReplicationFactorChangeOpt = 
parser.accepts("disallow-replication-factor-change", "Denies the ability to 
change a partition's replication factor as part of this reassignment through 
adding validation against it.");
 
         options = parser.parse(args);
     }
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java
 
b/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java
index 0dedf567c49..8c981e4eb44 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java
@@ -249,7 +249,7 @@ public class ReplicationQuotasTestRig {
 
             ReassignPartitionsCommand.executeAssignment(adminClient, false,
                 
ReassignPartitionsCommand.formatAsReassignmentJson(newAssignment, 
Collections.emptyMap()),
-                config.throttle, -1L, 10000L, Time.SYSTEM);
+                config.throttle, -1L, 10000L, Time.SYSTEM, false);
 
             //Await completion
             waitForReassignmentToComplete();
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
index d7f6f937d97..a96bccd36ed 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
@@ -93,6 +93,7 @@ import static 
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.verifyAs
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @ClusterTestDefaults(brokers = 5, disksPerBroker = 3, serverProperties = {
@@ -432,6 +433,23 @@ public class ReassignPartitionsCommandTest {
         }
     }
 
+    @ClusterTest
+    public void testDisallowReplicationFactorChange() {
+        createTopics();
+        String assignment = "{\"version\":1,\"partitions\":" +
+                
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1],\"log_dirs\":[\"any\",\"any\"]},"
 +
+                
"{\"topic\":\"foo\",\"partition\":1,\"replicas\":[0,1,2,3],\"log_dirs\":[\"any\",\"any\",\"any\",\"any\"]},"
 +
+                
"{\"topic\":\"bar\",\"partition\":0,\"replicas\":[3],\"log_dirs\":[\"any\"]}" +
+                "]}";
+        try (Admin admin = clusterInstance.admin()) {
+            assertEquals("Error reassigning partition(s):\n" +
+                            "bar-0: The replication factor is changed from 3 
to 1\n" +
+                            "foo-0: The replication factor is changed from 3 
to 2\n" +
+                            "foo-1: The replication factor is changed from 3 
to 4",
+                    assertThrows(TerseException.class, () -> 
executeAssignment(admin, false, assignment, -1L, -1L, 10000L, Time.SYSTEM, 
true)).getMessage());
+        }
+    }
+
     private void createTopics() {
         try (Admin admin = 
Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
 clusterInstance.bootstrapServers()))) {
             Map<Integer, List<Integer>> fooReplicasAssignments = new 
HashMap<>();
@@ -654,7 +672,7 @@ public class ReassignPartitionsCommandTest {
                                       Long replicaAlterLogDirsThrottle) throws 
RuntimeException {
         try (Admin admin = 
Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
 clusterInstance.bootstrapServers()))) {
             executeAssignment(admin, additional, reassignmentJson,
-                    interBrokerThrottle, replicaAlterLogDirsThrottle, 10000L, 
Time.SYSTEM);
+                    interBrokerThrottle, replicaAlterLogDirsThrottle, 10000L, 
Time.SYSTEM, false);
         } catch (ExecutionException | InterruptedException | 
JsonProcessingException | TerseException e) {
             throw new RuntimeException(e);
         }
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java
index d383b56e755..793eba842d2 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java
@@ -161,7 +161,7 @@ public class ReassignPartitionsUnitTest {
             reassignments.put(new TopicPartition("foo", 0), asList(0, 1, 3));
             reassignments.put(new TopicPartition("quux", 0), asList(1, 2, 3));
 
-            Map<TopicPartition, Throwable> reassignmentResult = 
alterPartitionReassignments(adminClient, reassignments);
+            Map<TopicPartition, Throwable> reassignmentResult = 
alterPartitionReassignments(adminClient, reassignments,  false);
 
             assertEquals(1, reassignmentResult.size());
             assertEquals(UnknownTopicOrPartitionException.class, 
reassignmentResult.get(new TopicPartition("quux", 0)).getClass());
@@ -606,7 +606,7 @@ public class ReassignPartitionsUnitTest {
                     "{\"version\":1,\"partitions\":" +
                         
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1],\"log_dirs\":[\"any\",\"any\"]},"
 +
                         
"{\"topic\":\"quux\",\"partition\":0,\"replicas\":[2,3,4],\"log_dirs\":[\"any\",\"any\",\"any\"]}"
 +
-                        "]}", -1L, -1L, 10000L, Time.SYSTEM), "Expected 
reassignment with non-existent topic to fail").getCause().getMessage());
+                        "]}", -1L, -1L, 10000L, Time.SYSTEM, false), "Expected 
reassignment with non-existent topic to fail").getCause().getMessage());
         }
     }
 
@@ -619,7 +619,7 @@ public class ReassignPartitionsUnitTest {
                     "{\"version\":1,\"partitions\":" +
                         
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1],\"log_dirs\":[\"any\",\"any\"]},"
 +
                         
"{\"topic\":\"foo\",\"partition\":1,\"replicas\":[2,3,4],\"log_dirs\":[\"any\",\"any\",\"any\"]}"
 +
-                        "]}", -1L, -1L, 10000L, Time.SYSTEM), "Expected 
reassignment with non-existent broker id to fail").getMessage());
+                        "]}", -1L, -1L, 10000L, Time.SYSTEM, false), "Expected 
reassignment with non-existent broker id to fail").getMessage());
         }
     }
 
@@ -670,7 +670,7 @@ public class ReassignPartitionsUnitTest {
             reassignments.put(new TopicPartition("foo", 0), asList(0, 1, 4, 
2));
             reassignments.put(new TopicPartition("bar", 0), asList(2, 3));
 
-            Map<TopicPartition, Throwable> reassignmentResult = 
alterPartitionReassignments(adminClient, reassignments);
+            Map<TopicPartition, Throwable> reassignmentResult = 
alterPartitionReassignments(adminClient, reassignments, false);
 
             assertTrue(reassignmentResult.isEmpty());
             assertEquals(String.join(System.lineSeparator(),
@@ -762,7 +762,7 @@ public class ReassignPartitionsUnitTest {
         try (MockAdminClient adminClient = new 
MockAdminClient.Builder().numBrokers(4).build()) {
             addTopics(adminClient);
             assertStartsWith("Unexpected character",
-                assertThrows(AdminOperationException.class, () -> 
executeAssignment(adminClient, false, "{invalid_json", -1L, -1L, 10000L, 
Time.SYSTEM)).getMessage());
+                assertThrows(AdminOperationException.class, () -> 
executeAssignment(adminClient, false, "{invalid_json", -1L, -1L, 10000L, 
Time.SYSTEM, false)).getMessage());
         }
     }
 }


Reply via email to