This is an automated email from the ASF dual-hosted git repository.
clolov pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new cbd72cc216e KAFKA-14121: AlterPartitionReassignments API should allow
callers to specify the option of preserving the replication factor (#18983)
cbd72cc216e is described below
commit cbd72cc216ec0a2f3fb427a859fefded79f8059c
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Wed Mar 5 19:23:12 2025 +0800
KAFKA-14121: AlterPartitionReassignments API should allow callers to
specify the option of preserving the replication factor (#18983)
Reviewers: Christo Lolov <[email protected]>, Chia-Ping Tsai
<[email protected]>, TengYao Chi <[email protected]>
---
.../java/org/apache/kafka/clients/admin/Admin.java | 7 ++
.../admin/AlterPartitionReassignmentsOptions.java | 21 ++++
.../kafka/clients/admin/KafkaAdminClient.java | 1 +
.../AlterPartitionReassignmentsRequest.java | 6 +
.../AlterPartitionReassignmentsRequest.json | 5 +-
.../AlterPartitionReassignmentsResponse.json | 5 +-
.../kafka/api/PlaintextAdminIntegrationTest.scala | 19 ++-
.../controller/ReplicationControlManager.java | 42 ++++++-
.../controller/ReplicationControlManagerTest.java | 129 +++++++++++++++++++++
.../tools/reassign/ReassignPartitionsCommand.java | 26 +++--
.../reassign/ReassignPartitionsCommandOptions.java | 2 +
.../tools/other/ReplicationQuotasTestRig.java | 2 +-
.../reassign/ReassignPartitionsCommandTest.java | 20 +++-
.../tools/reassign/ReassignPartitionsUnitTest.java | 10 +-
14 files changed, 270 insertions(+), 25 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index dcffef60d09..a0bb85899f8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -1083,6 +1083,13 @@ public interface Admin extends AutoCloseable {
* if the request timed out before the controller could record the new
assignments.</li>
* <li>{@link
org.apache.kafka.common.errors.InvalidReplicaAssignmentException}
* If the specified assignment was not valid.</li>
+ * <li>{@link
org.apache.kafka.common.errors.InvalidReplicationFactorException}
+ * If the replication factor was changed in an invalid way.
+ * Only thrown when {@link
AlterPartitionReassignmentsOptions#allowReplicationFactorChange()} is set to
false and
+ * the request is attempting to alter reassignments (not cancel)</li>
+ * <li>{@link org.apache.kafka.common.errors.UnsupportedVersionException}
+ * If {@link
AlterPartitionReassignmentsOptions#allowReplicationFactorChange()} was changed
outside the default
+ * and the server does not support the option (e.g due to an old Kafka
version).</li>
* <li>{@link
org.apache.kafka.common.errors.NoReassignmentInProgressException}
* If there was an attempt to cancel a reassignment for a partition
which was not being reassigned.</li>
* </ul>
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/AlterPartitionReassignmentsOptions.java
b/clients/src/main/java/org/apache/kafka/clients/admin/AlterPartitionReassignmentsOptions.java
index 166e90404c3..74c9f3dcdec 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/AlterPartitionReassignmentsOptions.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/AlterPartitionReassignmentsOptions.java
@@ -23,4 +23,25 @@ import java.util.Map;
* Options for {@link AdminClient#alterPartitionReassignments(Map,
AlterPartitionReassignmentsOptions)}
*/
public class AlterPartitionReassignmentsOptions extends
AbstractOptions<AlterPartitionReassignmentsOptions> {
+
+ private boolean allowReplicationFactorChange = true;
+
+ /**
+ * Set the option indicating if the alter partition reassignments call
should be
+ * allowed to alter the replication factor of a partition.
+ * In cases where it is not allowed, any replication factor change will
result in an exception thrown by the API.
+ */
+ public AlterPartitionReassignmentsOptions
allowReplicationFactorChange(boolean allow) {
+ this.allowReplicationFactorChange = allow;
+ return this;
+ }
+
+ /**
+ * A boolean indicating if the alter partition reassignments should be
+ * allowed to alter the replication factor of a partition.
+ * In cases where it is not allowed, any replication factor change will
result in an exception thrown by the API.
+ */
+ public boolean allowReplicationFactorChange() {
+ return this.allowReplicationFactorChange;
+ }
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 725e48c3656..8432ec754a0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -3944,6 +3944,7 @@ public class KafkaAdminClient extends AdminClient {
data.topics().add(reassignableTopic);
}
data.setTimeoutMs(timeoutMs);
+
data.setAllowReplicationFactorChange(options.allowReplicationFactorChange());
return new AlterPartitionReassignmentsRequest.Builder(data);
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsRequest.java
index 2d289cc1497..8032207943a 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsRequest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import
org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignableTopic;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
@@ -42,6 +43,11 @@ public class AlterPartitionReassignmentsRequest extends
AbstractRequest {
@Override
public AlterPartitionReassignmentsRequest build(short version) {
+ if (!data.allowReplicationFactorChange() && version < 1) {
+ throw new UnsupportedVersionException("The broker does not
support the AllowReplicationFactorChange " +
+ "option for the AlterPartitionReassignments API.
Consider re-sending the request without the " +
+ "option or updating the server version");
+ }
return new AlterPartitionReassignmentsRequest(data, version);
}
diff --git
a/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json
b/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json
index f3047feb0a3..d0ccc1c088e 100644
---
a/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json
+++
b/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json
@@ -18,11 +18,14 @@
"type": "request",
"listeners": ["broker", "controller"],
"name": "AlterPartitionReassignmentsRequest",
- "validVersions": "0",
+ // Version 1 adds the ability to allow/disallow changing the replication
factor as part of the request.
+ "validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{ "name": "TimeoutMs", "type": "int32", "versions": "0+", "default":
"60000",
"about": "The time in ms to wait for the request to complete." },
+ { "name": "AllowReplicationFactorChange", "type": "bool", "versions":
"1+", "default": "true",
+ "about": "The option indicating whether changing the replication factor
of any given partition as part of this request is a valid move." },
{ "name": "Topics", "type": "[]ReassignableTopic", "versions": "0+",
"about": "The topics to reassign.", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType":
"topicName",
diff --git
a/clients/src/main/resources/common/message/AlterPartitionReassignmentsResponse.json
b/clients/src/main/resources/common/message/AlterPartitionReassignmentsResponse.json
index 0b8f60b0bab..36ce87968ec 100644
---
a/clients/src/main/resources/common/message/AlterPartitionReassignmentsResponse.json
+++
b/clients/src/main/resources/common/message/AlterPartitionReassignmentsResponse.json
@@ -17,11 +17,14 @@
"apiKey": 45,
"type": "response",
"name": "AlterPartitionReassignmentsResponse",
- "validVersions": "0",
+ // Version 1 adds the ability to allow/disallow changing the replication
factor as part of the request.
+ "validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was
throttled due to a quota violation, or zero if the request did not violate any
quota." },
+ { "name": "AllowReplicationFactorChange", "type": "bool", "versions":
"1+", "default": "true", "ignorable": true,
+ "about": "The option indicating whether changing the replication factor
of any given partition as part of the request was allowed." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The top-level error code, or 0 if there was no error." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+",
"nullableVersions": "0+",
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index b57f4cd722c..18c5c2bbd05 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -3452,13 +3452,28 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
val tp1 = new TopicPartition(topic, 0)
val tp2 = new TopicPartition(topic, 1)
val tp3 = new TopicPartition(topic, 2)
- createTopic(topic, numPartitions = 4)
-
+ createTopic(topic, numPartitions = 4, replicationFactor = 2)
val validAssignment = Optional.of(new NewPartitionReassignment(
(0 until brokerCount).map(_.asInstanceOf[Integer]).asJava
))
+ val alterOptions = new AlterPartitionReassignmentsOptions
+ alterOptions.allowReplicationFactorChange(false)
+ val alterReplicaNumberTo1 = Optional.of(new
NewPartitionReassignment(List(1.asInstanceOf[Integer]).asJava))
+ val alterReplicaNumberTo2 = Optional.of(new NewPartitionReassignment((0
until brokerCount - 1).map(_.asInstanceOf[Integer]).asJava))
+ val alterReplicaNumberTo3 = Optional.of(new NewPartitionReassignment((0
until brokerCount).map(_.asInstanceOf[Integer]).asJava))
+ val alterReplicaResults = client.alterPartitionReassignments(Map(
+ tp1 -> alterReplicaNumberTo1,
+ tp2 -> alterReplicaNumberTo2,
+ tp3 -> alterReplicaNumberTo3,
+ ).asJava, alterOptions).values()
+ assertDoesNotThrow(() => alterReplicaResults.get(tp2).get())
+ assertEquals("The replication factor is changed from 2 to 1",
+ assertFutureThrows(classOf[InvalidReplicationFactorException],
alterReplicaResults.get(tp1)).getMessage)
+ assertEquals("The replication factor is changed from 2 to 3",
+ assertFutureThrows(classOf[InvalidReplicationFactorException],
alterReplicaResults.get(tp3)).getMessage)
+
val nonExistentTp1 = new TopicPartition("topicA", 0)
val nonExistentTp2 = new TopicPartition(topic, 4)
val nonExistentPartitionsResult = client.alterPartitionReassignments(Map(
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index b5d4993f90b..d4c613003c3 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -2058,8 +2058,10 @@ public class ReplicationControlManager {
ControllerResult<AlterPartitionReassignmentsResponseData>
alterPartitionReassignments(AlterPartitionReassignmentsRequestData
request) {
List<ApiMessageAndVersion> records =
BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
+ boolean allowRFChange = request.allowReplicationFactorChange();
AlterPartitionReassignmentsResponseData result =
- new
AlterPartitionReassignmentsResponseData().setErrorMessage(null);
+ new
AlterPartitionReassignmentsResponseData().setErrorMessage(null)
+ .setAllowReplicationFactorChange(allowRFChange);
int successfulAlterations = 0, totalAlterations = 0;
for (ReassignableTopic topic : request.topics()) {
ReassignableTopicResponse topicResponse = new
ReassignableTopicResponse().
@@ -2067,7 +2069,7 @@ public class ReplicationControlManager {
for (ReassignablePartition partition : topic.partitions()) {
ApiError error = ApiError.NONE;
try {
- alterPartitionReassignment(topic.name(), partition,
records);
+ alterPartitionReassignment(topic.name(), partition,
records, allowRFChange);
successfulAlterations++;
} catch (Throwable e) {
log.info("Unable to alter partition reassignment for " +
@@ -2090,7 +2092,8 @@ public class ReplicationControlManager {
void alterPartitionReassignment(String topicName,
ReassignablePartition target,
- List<ApiMessageAndVersion> records) {
+ List<ApiMessageAndVersion> records,
+ boolean allowRFChange) {
Uuid topicId = topicsByName.get(topicName);
if (topicId == null) {
throw new UnknownTopicOrPartitionException("Unable to find a topic
" +
@@ -2111,7 +2114,7 @@ public class ReplicationControlManager {
if (target.replicas() == null) {
record = cancelPartitionReassignment(topicName, tp, part);
} else {
- record = changePartitionReassignment(tp, part, target);
+ record = changePartitionReassignment(tp, part, target,
allowRFChange);
}
record.ifPresent(records::add);
}
@@ -2175,18 +2178,23 @@ public class ReplicationControlManager {
* @param tp The topic id and partition id.
* @param part The existing partition info.
* @param target The target partition info.
+ * @param allowRFChange Validate if partition replication factor can
change. KIP-860
*
* @return The ChangePartitionRecord for the new
partition assignment,
* or empty if no change is needed.
*/
Optional<ApiMessageAndVersion>
changePartitionReassignment(TopicIdPartition tp,
PartitionRegistration part,
-
ReassignablePartition target) {
+
ReassignablePartition target,
+ boolean
allowRFChange) {
// Check that the requested partition assignment is valid.
PartitionAssignment currentAssignment = new
PartitionAssignment(Replicas.toList(part.replicas), part::directory);
PartitionAssignment targetAssignment = new
PartitionAssignment(target.replicas(), clusterDescriber);
validateManualPartitionAssignment(targetAssignment,
OptionalInt.empty());
+ if (!allowRFChange) {
+ validatePartitionReplicationFactorUnchanged(part, target);
+ }
List<Integer> currentReplicas = Replicas.toList(part.replicas);
PartitionReassignmentReplicas reassignment =
@@ -2406,6 +2414,30 @@ public class ReplicationControlManager {
newPartInfo.elr);
}
+ private void
validatePartitionReplicationFactorUnchanged(PartitionRegistration part,
+
ReassignablePartition target) {
+ int currentReassignmentSetSize;
+ if (isReassignmentInProgress(part)) {
+ Set<Integer> set = new HashSet<>();
+ for (int r : part.replicas) {
+ set.add(r);
+ }
+ for (int r : part.addingReplicas) {
+ set.add(r);
+ }
+ for (int r : part.removingReplicas) {
+ set.remove(r);
+ }
+ currentReassignmentSetSize = set.size();
+ } else {
+ currentReassignmentSetSize = part.replicas.length;
+ }
+ if (currentReassignmentSetSize != target.replicas().size()) {
+ throw new InvalidReplicationFactorException("The replication
factor is changed from " +
+ currentReassignmentSetSize + " to " +
target.replicas().size());
+ }
+ }
+
private static final class IneligibleReplica {
private final int replicaId;
private final String reason;
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index ef58e2bc7f5..0516b134c78 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -1922,6 +1922,135 @@ public class ReplicationControlManagerTest {
assertEquals(NONE_REASSIGNING,
replication.listPartitionReassignments(null, Long.MAX_VALUE));
}
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
+ public void testAlterPartitionDisallowReplicationFactorChange(short
version) {
+ MetadataVersion metadataVersion = MetadataVersion.latestTesting();
+ ReplicationControlTestContext ctx = new
ReplicationControlTestContext.Builder()
+ .setMetadataVersion(metadataVersion)
+ .build();
+ ReplicationControlManager replication = ctx.replicationControl;
+ ctx.registerBrokers(0, 1, 2, 3);
+ ctx.unfenceBrokers(0, 1, 2, 3);
+ ctx.createTestTopic("foo", new int[][] {new int[] {0, 1, 2}, new int[]
{0, 1, 2}, new int[] {0, 1, 2}});
+
+ ControllerResult<AlterPartitionReassignmentsResponseData> alterResult =
+ replication.alterPartitionReassignments(
+ new
AlterPartitionReassignmentsRequestData().setTopics(singletonList(
+ new
ReassignableTopic().setName("foo").setPartitions(asList(
+ new
ReassignablePartition().setPartitionIndex(0).
+ setReplicas(asList(1, 2, 3)),
+ new
ReassignablePartition().setPartitionIndex(1).
+ setReplicas(asList(0, 1)),
+ new
ReassignablePartition().setPartitionIndex(2).
+ setReplicas(asList(0, 1, 2,
3)))))).
+ setAllowReplicationFactorChange(false));
+ assertEquals(new AlterPartitionReassignmentsResponseData().
+
setErrorMessage(null).setAllowReplicationFactorChange(false).setResponses(singletonList(
+ new
ReassignableTopicResponse().setName("foo").setPartitions(asList(
+ new
ReassignablePartitionResponse().setPartitionIndex(0).
+ setErrorMessage(null),
+ new
ReassignablePartitionResponse().setPartitionIndex(1).
+
setErrorCode(INVALID_REPLICATION_FACTOR.code()).
+ setErrorMessage("The
replication factor is changed from 3 to 2"),
+ new
ReassignablePartitionResponse().setPartitionIndex(2).
+
setErrorCode(INVALID_REPLICATION_FACTOR.code()).
+ setErrorMessage("The
replication factor is changed from 3 to 4"))))),
+ alterResult.response());
+ ctx.replay(alterResult.records());
+ ListPartitionReassignmentsResponseData currentReassigning =
+ new
ListPartitionReassignmentsResponseData().setErrorMessage(null).
+ setTopics(singletonList(new OngoingTopicReassignment().
+ setName("foo").setPartitions(singletonList(
+ new
OngoingPartitionReassignment().setPartitionIndex(0).
+
setRemovingReplicas(singletonList(0)).
+
setAddingReplicas(singletonList(3)).
+ setReplicas(asList(1, 2, 3,
0))))));
+ assertEquals(currentReassigning,
replication.listPartitionReassignments(singletonList(
+ new ListPartitionReassignmentsTopics().setName("foo").
+ setPartitionIndexes(asList(0, 1, 2))),
Long.MAX_VALUE));
+
+ // test alter replica factor not allow to change when partition
reassignment is ongoing
+ ControllerResult<AlterPartitionReassignmentsResponseData>
alterReassigningResult =
+ replication.alterPartitionReassignments(
+ new
AlterPartitionReassignmentsRequestData().setTopics(singletonList(
+ new
ReassignableTopic().setName("foo").setPartitions(singletonList(
+ new
ReassignablePartition().setPartitionIndex(0).setReplicas(asList(0, 1)))))).
+ setAllowReplicationFactorChange(false));
+ assertEquals(new AlterPartitionReassignmentsResponseData().
+
setErrorMessage(null).setAllowReplicationFactorChange(false).setResponses(singletonList(
+ new
ReassignableTopicResponse().setName("foo").setPartitions(singletonList(
+ new
ReassignablePartitionResponse().setPartitionIndex(0).
+
setErrorCode(INVALID_REPLICATION_FACTOR.code()).
+ setErrorMessage("The
replication factor is changed from 3 to 2"))))),
+ alterReassigningResult.response());
+
+ ControllerResult<AlterPartitionReassignmentsResponseData>
alterReassigningResult2 =
+ replication.alterPartitionReassignments(
+ new
AlterPartitionReassignmentsRequestData().setTopics(singletonList(
+ new
ReassignableTopic().setName("foo").setPartitions(singletonList(
+ new
ReassignablePartition().setPartitionIndex(0).setReplicas(asList(0, 2, 3)))))).
+ setAllowReplicationFactorChange(false));
+ assertEquals(new AlterPartitionReassignmentsResponseData().
+
setErrorMessage(null).setAllowReplicationFactorChange(false).setResponses(singletonList(
+ new
ReassignableTopicResponse().setName("foo").setPartitions(singletonList(
+ new
ReassignablePartitionResponse().setPartitionIndex(0).
+ setErrorMessage(null))))),
+ alterReassigningResult2.response());
+ }
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
+ public void
testDisallowReplicationFactorChangeNoEffectWhenCancelAlterPartition(short
version) {
+ MetadataVersion metadataVersion = MetadataVersion.latestTesting();
+ ReplicationControlTestContext ctx = new
ReplicationControlTestContext.Builder()
+ .setMetadataVersion(metadataVersion)
+ .build();
+ ReplicationControlManager replication = ctx.replicationControl;
+ ctx.registerBrokers(0, 1, 2, 3);
+ ctx.unfenceBrokers(0, 1, 2, 3);
+ ctx.createTestTopic("foo", new int[][] {new int[] {0, 1,
2}}).topicId();
+
+ ControllerResult<AlterPartitionReassignmentsResponseData> alterResult =
+ replication.alterPartitionReassignments(
+ new
AlterPartitionReassignmentsRequestData().setTopics(singletonList(
+ new
ReassignableTopic().setName("foo").setPartitions(singletonList(
+ new
ReassignablePartition().setPartitionIndex(0).
+ setReplicas(asList(1,
2, 3)))))));
+ assertEquals(new AlterPartitionReassignmentsResponseData().
+ setErrorMessage(null).setResponses(singletonList(
+ new
ReassignableTopicResponse().setName("foo").setPartitions(singletonList(
+ new
ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null))))),
+ alterResult.response());
+ ctx.replay(alterResult.records());
+ ListPartitionReassignmentsResponseData currentReassigning =
+ new
ListPartitionReassignmentsResponseData().setErrorMessage(null).
+ setTopics(singletonList(new OngoingTopicReassignment().
+ setName("foo").setPartitions(singletonList(
+ new
OngoingPartitionReassignment().setPartitionIndex(0).
+
setRemovingReplicas(singletonList(0)).
+
setAddingReplicas(singletonList(3)).
+ setReplicas(asList(1, 2, 3,
0))))));
+ assertEquals(currentReassigning,
replication.listPartitionReassignments(singletonList(
+ new ListPartitionReassignmentsTopics().setName("foo").
+ setPartitionIndexes(asList(0, 1, 2))),
Long.MAX_VALUE));
+
+ // test replica factor change check takes no effect when partition
reassignment is ongoing
+ ControllerResult<AlterPartitionReassignmentsResponseData> cancelResult
=
+ replication.alterPartitionReassignments(
+ new
AlterPartitionReassignmentsRequestData().setTopics(singletonList(
+ new
ReassignableTopic().setName("foo").setPartitions(singletonList(
+ new
ReassignablePartition().setPartitionIndex(0).setReplicas(null))))).
+ setAllowReplicationFactorChange(false));
+ assertEquals(new
AlterPartitionReassignmentsResponseData().setAllowReplicationFactorChange(false).setErrorMessage(null).
+ setResponses(singletonList(
+ new
ReassignableTopicResponse().setName("foo").setPartitions(singletonList(
+ new
ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null))))),
+ cancelResult.response());
+ ctx.replay(cancelResult.records());
+ assertEquals(NONE_REASSIGNING,
replication.listPartitionReassignments(null, Long.MAX_VALUE));
+ }
+
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
public void testAlterPartitionShouldRejectFencedBrokers(short version) {
diff --git
a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
index 137092da32c..33bf23d13d1 100644
---
a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
+++
b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
@@ -20,6 +20,7 @@ import org.apache.kafka.admin.BrokerMetadata;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.AlterPartitionReassignmentsOptions;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
@@ -182,7 +183,8 @@ public class ReassignPartitionsCommand {
opts.options.valueOf(opts.interBrokerThrottleOpt),
opts.options.valueOf(opts.replicaAlterLogDirsThrottleOpt),
opts.options.valueOf(opts.timeoutOpt),
- Time.SYSTEM);
+ Time.SYSTEM,
+ opts.options.has(opts.disallowReplicationFactorChangeOpt));
} else if (opts.options.has(opts.cancelOpt)) {
cancelAssignment(adminClient,
Utils.readFileAsString(opts.options.valueOf(opts.reassignmentJsonFileOpt)),
@@ -761,7 +763,8 @@ public class ReassignPartitionsCommand {
Long interBrokerThrottle,
Long logDirThrottle,
Long timeoutMs,
- Time time
+ Time time,
+ boolean disallowReplicationFactorChange
) throws ExecutionException, InterruptedException,
JsonProcessingException, TerseException {
Entry<Map<TopicPartition, List<Integer>>, Map<TopicPartitionReplica,
String>> t0 = parseExecuteAssignmentArgs(reassignmentJson);
@@ -796,7 +799,7 @@ public class ReassignPartitionsCommand {
}
// Execute the partition reassignments.
- Map<TopicPartition, Throwable> errors =
alterPartitionReassignments(adminClient, proposedParts);
+ Map<TopicPartition, Throwable> errors =
alterPartitionReassignments(adminClient, proposedParts,
disallowReplicationFactorChange);
if (!errors.isEmpty()) {
throw new TerseException(
String.format("Error reassigning partition(s):%n%s",
@@ -941,15 +944,19 @@ public class ReassignPartitionsCommand {
/**
* Execute the given partition reassignments.
*
- * @param adminClient The admin client object to use.
- * @param reassignments A map from topic names to target replica
assignments.
- * @return A map from partition objects to error strings.
+ * @param adminClient The admin client object to
use.
+ * @param reassignments A map from topic names to
target replica assignments.
+ * @param disallowReplicationFactorChange Disallow replication factor
change or not.
+ * @return A map from partition objects
to error strings.
*/
static Map<TopicPartition, Throwable> alterPartitionReassignments(Admin
adminClient,
-
Map<TopicPartition, List<Integer>> reassignments) throws InterruptedException {
+
Map<TopicPartition, List<Integer>> reassignments,
+ boolean
disallowReplicationFactorChange) throws InterruptedException {
Map<TopicPartition, Optional<NewPartitionReassignment>> args = new
HashMap<>();
reassignments.forEach((part, replicas) -> args.put(part,
Optional.of(new NewPartitionReassignment(replicas))));
- Map<TopicPartition, KafkaFuture<Void>> results =
adminClient.alterPartitionReassignments(args).values();
+ AlterPartitionReassignmentsOptions options = new
AlterPartitionReassignmentsOptions();
+ options.allowReplicationFactorChange(!disallowReplicationFactorChange);
+ Map<TopicPartition, KafkaFuture<Void>> results =
adminClient.alterPartitionReassignments(args, options).values();
Map<TopicPartition, Throwable> errors = new HashMap<>();
for (Entry<TopicPartition, KafkaFuture<Void>> e : results.entrySet())
{
try {
@@ -1485,7 +1492,8 @@ public class ReassignPartitionsCommand {
opts.commandConfigOpt,
opts.interBrokerThrottleOpt,
opts.replicaAlterLogDirsThrottleOpt,
- opts.timeoutOpt
+ opts.timeoutOpt,
+ opts.disallowReplicationFactorChangeOpt
));
permittedArgs.put(opts.cancelOpt, Arrays.asList(
isBootstrapServer ? opts.bootstrapServerOpt :
opts.bootstrapControllerOpt,
diff --git
a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java
b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java
index 2d31c5a902a..39541288712 100644
---
a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java
+++
b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java
@@ -42,6 +42,7 @@ public class ReassignPartitionsCommandOptions extends
CommandDefaultOptions {
final OptionSpec<Long> timeoutOpt;
final OptionSpec<?> additionalOpt;
final OptionSpec<?> preserveThrottlesOpt;
+ final OptionSpec<?> disallowReplicationFactorChangeOpt;
public ReassignPartitionsCommandOptions(String[] args) {
super(args);
@@ -115,6 +116,7 @@ public class ReassignPartitionsCommandOptions extends
CommandDefaultOptions {
additionalOpt = parser.accepts("additional", "Execute this
reassignment in addition to any " +
"other ongoing ones. This option can also be used to change the
throttle of an ongoing reassignment.");
preserveThrottlesOpt = parser.accepts("preserve-throttles", "Do not
modify broker or topic throttles.");
+ disallowReplicationFactorChangeOpt =
parser.accepts("disallow-replication-factor-change", "Denies the ability to
change a partition's replication factor as part of this reassignment through
adding validation against it.");
options = parser.parse(args);
}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java
b/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java
index 0dedf567c49..8c981e4eb44 100644
---
a/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java
+++
b/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java
@@ -249,7 +249,7 @@ public class ReplicationQuotasTestRig {
ReassignPartitionsCommand.executeAssignment(adminClient, false,
ReassignPartitionsCommand.formatAsReassignmentJson(newAssignment,
Collections.emptyMap()),
- config.throttle, -1L, 10000L, Time.SYSTEM);
+ config.throttle, -1L, 10000L, Time.SYSTEM, false);
//Await completion
waitForReassignmentToComplete();
diff --git
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
index d7f6f937d97..a96bccd36ed 100644
---
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
@@ -93,6 +93,7 @@ import static
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.verifyAs
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ClusterTestDefaults(brokers = 5, disksPerBroker = 3, serverProperties = {
@@ -432,6 +433,23 @@ public class ReassignPartitionsCommandTest {
}
}
+ @ClusterTest
+ public void testDisallowReplicationFactorChange() {
+ createTopics();
+ String assignment = "{\"version\":1,\"partitions\":" +
+
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1],\"log_dirs\":[\"any\",\"any\"]},"
+
+
"{\"topic\":\"foo\",\"partition\":1,\"replicas\":[0,1,2,3],\"log_dirs\":[\"any\",\"any\",\"any\",\"any\"]},"
+
+
"{\"topic\":\"bar\",\"partition\":0,\"replicas\":[3],\"log_dirs\":[\"any\"]}" +
+ "]}";
+ try (Admin admin = clusterInstance.admin()) {
+ assertEquals("Error reassigning partition(s):\n" +
+ "bar-0: The replication factor is changed from 3
to 1\n" +
+ "foo-0: The replication factor is changed from 3
to 2\n" +
+ "foo-1: The replication factor is changed from 3
to 4",
+ assertThrows(TerseException.class, () ->
executeAssignment(admin, false, assignment, -1L, -1L, 10000L, Time.SYSTEM,
true)).getMessage());
+ }
+ }
+
private void createTopics() {
try (Admin admin =
Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
clusterInstance.bootstrapServers()))) {
Map<Integer, List<Integer>> fooReplicasAssignments = new
HashMap<>();
@@ -654,7 +672,7 @@ public class ReassignPartitionsCommandTest {
Long replicaAlterLogDirsThrottle) throws
RuntimeException {
try (Admin admin =
Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
clusterInstance.bootstrapServers()))) {
executeAssignment(admin, additional, reassignmentJson,
- interBrokerThrottle, replicaAlterLogDirsThrottle, 10000L,
Time.SYSTEM);
+ interBrokerThrottle, replicaAlterLogDirsThrottle, 10000L,
Time.SYSTEM, false);
} catch (ExecutionException | InterruptedException |
JsonProcessingException | TerseException e) {
throw new RuntimeException(e);
}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java
index d383b56e755..793eba842d2 100644
---
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java
@@ -161,7 +161,7 @@ public class ReassignPartitionsUnitTest {
reassignments.put(new TopicPartition("foo", 0), asList(0, 1, 3));
reassignments.put(new TopicPartition("quux", 0), asList(1, 2, 3));
- Map<TopicPartition, Throwable> reassignmentResult =
alterPartitionReassignments(adminClient, reassignments);
+ Map<TopicPartition, Throwable> reassignmentResult =
alterPartitionReassignments(adminClient, reassignments, false);
assertEquals(1, reassignmentResult.size());
assertEquals(UnknownTopicOrPartitionException.class,
reassignmentResult.get(new TopicPartition("quux", 0)).getClass());
@@ -606,7 +606,7 @@ public class ReassignPartitionsUnitTest {
"{\"version\":1,\"partitions\":" +
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1],\"log_dirs\":[\"any\",\"any\"]},"
+
"{\"topic\":\"quux\",\"partition\":0,\"replicas\":[2,3,4],\"log_dirs\":[\"any\",\"any\",\"any\"]}"
+
- "]}", -1L, -1L, 10000L, Time.SYSTEM), "Expected
reassignment with non-existent topic to fail").getCause().getMessage());
+ "]}", -1L, -1L, 10000L, Time.SYSTEM, false), "Expected
reassignment with non-existent topic to fail").getCause().getMessage());
}
}
@@ -619,7 +619,7 @@ public class ReassignPartitionsUnitTest {
"{\"version\":1,\"partitions\":" +
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1],\"log_dirs\":[\"any\",\"any\"]},"
+
"{\"topic\":\"foo\",\"partition\":1,\"replicas\":[2,3,4],\"log_dirs\":[\"any\",\"any\",\"any\"]}"
+
- "]}", -1L, -1L, 10000L, Time.SYSTEM), "Expected
reassignment with non-existent broker id to fail").getMessage());
+ "]}", -1L, -1L, 10000L, Time.SYSTEM, false), "Expected
reassignment with non-existent broker id to fail").getMessage());
}
}
@@ -670,7 +670,7 @@ public class ReassignPartitionsUnitTest {
reassignments.put(new TopicPartition("foo", 0), asList(0, 1, 4,
2));
reassignments.put(new TopicPartition("bar", 0), asList(2, 3));
- Map<TopicPartition, Throwable> reassignmentResult =
alterPartitionReassignments(adminClient, reassignments);
+ Map<TopicPartition, Throwable> reassignmentResult =
alterPartitionReassignments(adminClient, reassignments, false);
assertTrue(reassignmentResult.isEmpty());
assertEquals(String.join(System.lineSeparator(),
@@ -762,7 +762,7 @@ public class ReassignPartitionsUnitTest {
try (MockAdminClient adminClient = new
MockAdminClient.Builder().numBrokers(4).build()) {
addTopics(adminClient);
assertStartsWith("Unexpected character",
- assertThrows(AdminOperationException.class, () ->
executeAssignment(adminClient, false, "{invalid_json", -1L, -1L, 10000L,
Time.SYSTEM)).getMessage());
+ assertThrows(AdminOperationException.class, () ->
executeAssignment(adminClient, false, "{invalid_json", -1L, -1L, 10000L,
Time.SYSTEM, false)).getMessage());
}
}
}