This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9be8ca1d9b8 KAFKA-17689 Migrate ReassignReplicaMoveTest and
ReassignReplicaExpandTest to new test infra (#22343)
9be8ca1d9b8 is described below
commit 9be8ca1d9b86fb55d04bae2a2884af5bc65dc134
Author: majialong <[email protected]>
AuthorDate: Sat May 23 10:22:58 2026 +0800
KAFKA-17689 Migrate ReassignReplicaMoveTest and ReassignReplicaExpandTest
to new test infra (#22343)
Migrate `ReassignReplicaMoveTest` and `ReassignReplicaExpandTest` to new
test infra, and integrated into a single test file.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../integration/BaseReassignReplicaTest.java | 101 ----------------
.../integration/ReassignReplicaExpandTest.java | 31 -----
.../ReassignReplicaMoveAndExpandTest.java | 133 +++++++++++++++++++++
.../integration/ReassignReplicaMoveTest.java | 31 -----
4 files changed, 133 insertions(+), 163 deletions(-)
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseReassignReplicaTest.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseReassignReplicaTest.java
deleted file mode 100644
index ff31f0dd248..00000000000
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseReassignReplicaTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.tiered.storage.integration;
-
-import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
-import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
-import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
-
-public abstract class BaseReassignReplicaTest extends TieredStorageTestHarness
{
- protected final Integer broker0 = 0;
- protected final Integer broker1 = 1;
-
- /**
- * Cluster of two brokers
- * @return number of brokers in the cluster
- */
- @Override
- public int brokerCount() {
- return 2;
- }
-
- /**
- * Number of partitions in the '__remote_log_metadata' topic
- * @return number of partitions in the '__remote_log_metadata' topic
- */
- @Override
- public int numRemoteLogMetadataPartitions() {
- return 2;
- }
-
- @Override
- protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
- final String topicA = "topicA";
- final String topicB = "topicB";
- final int p0 = 0;
- final int partitionCount = 1;
- final int replicationFactor = 1;
- final int maxBatchCountPerSegment = 1;
- final Map<Integer, List<Integer>> replicaAssignment = null;
- final boolean enableRemoteLogStorage = true;
- final List<Integer> metadataPartitions = new ArrayList<>();
- for (int i = 0; i < numRemoteLogMetadataPartitions(); i++) {
- metadataPartitions.add(i);
- }
-
- builder
- // create topicA with 50 partitions and 2 RF. Using 50
partitions to ensure that the user-partitions
- // are mapped to all the __remote_log_metadata partitions.
This is required to ensure that
- // TBRLMM able to handle the assignment of the newly created
replica to one of the already assigned
- // metadata partition
- .createTopic(topicA, 50, 2, maxBatchCountPerSegment,
- replicaAssignment, enableRemoteLogStorage)
- .expectUserTopicMappedToMetadataPartitions(topicA,
metadataPartitions)
- // create topicB with 1 partition and 1 RF
- .createTopic(topicB, partitionCount, replicationFactor,
maxBatchCountPerSegment,
- mkMap(mkEntry(p0, List.of(broker0))),
enableRemoteLogStorage)
- // send records to partition 0
- .expectSegmentToBeOffloaded(broker0, topicB, p0, 0, new
KeyValueSpec("k0", "v0"))
- .expectSegmentToBeOffloaded(broker0, topicB, p0, 1, new
KeyValueSpec("k1", "v1"))
- .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 2L)
- .produce(topicB, p0, new KeyValueSpec("k0", "v0"), new
KeyValueSpec("k1", "v1"),
- new KeyValueSpec("k2", "v2"))
- // The newly created replica gets mapped to one of the
metadata partition which is being actively
- // consumed by both the brokers
- .reassignReplica(topicB, p0, replicaIds())
- .expectLeader(topicB, p0, broker1, true)
- // produce some more events and verify the earliest local
offset
- .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 3L)
- .produce(topicB, p0, new KeyValueSpec("k3", "v3"))
- // consume from the beginning of the topic to read data from
local and remote storage
- .expectFetchFromTieredStorage(broker1, topicB, p0, 3)
- .consume(topicB, p0, 0L, 4, 3);
- }
-
- /**
- * Replicas of the topic
- * @return the replica-ids of the topic
- */
- protected abstract List<Integer> replicaIds();
-}
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaExpandTest.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaExpandTest.java
deleted file mode 100644
index ddcf84109af..00000000000
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaExpandTest.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.tiered.storage.integration;
-
-import java.util.List;
-
-public final class ReassignReplicaExpandTest extends BaseReassignReplicaTest {
-
- /**
- * Expand the replication factor of the topic by changing the replica list
from 0 to 0, 1
- * @return the replica-ids of the topic
- */
- @Override
- protected List<Integer> replicaIds() {
- return List.of(broker0, broker1);
- }
-}
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaMoveAndExpandTest.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaMoveAndExpandTest.java
new file mode 100644
index 00000000000..3c48f3cdc09
--- /dev/null
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaMoveAndExpandTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfig;
+import org.apache.kafka.common.test.api.ClusterTemplate;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.tiered.storage.TieredStorageTestAction;
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestContext;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static
org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createServerPropsForRemoteStorage;
+
+public final class ReassignReplicaMoveAndExpandTest {
+ private static final int BROKER_COUNT = 3;
+ private static final int NUM_REMOTE_LOG_METADATA_PARTITIONS = 2;
+
+ private static final Integer BROKER_0 = 0;
+ private static final Integer BROKER_1 = 1;
+
+ private static List<ClusterConfig> clusterConfig() {
+ return List.of(ClusterConfig.defaultBuilder()
+ .setTypes(Set.of(Type.KRAFT))
+ .setBrokers(BROKER_COUNT)
+ .setServerProperties(createServerPropsForRemoteStorage(
+
ReassignReplicaMoveAndExpandTest.class.getSimpleName().toLowerCase(Locale.ROOT),
+ BROKER_COUNT,
+ NUM_REMOTE_LOG_METADATA_PARTITIONS))
+ .build());
+ }
+
+ @ClusterTemplate("clusterConfig")
+ public void
testReassignReplicaExpandWithClassicGroupProtocol(ClusterInstance
clusterInstance) throws Exception {
+ executeReassignReplicaTest(clusterInstance, GroupProtocol.CLASSIC,
List.of(BROKER_0, BROKER_1));
+ }
+
+ @ClusterTemplate("clusterConfig")
+ public void
testReassignReplicaExpandWithConsumerGroupProtocol(ClusterInstance
clusterInstance) throws Exception {
+ executeReassignReplicaTest(clusterInstance, GroupProtocol.CONSUMER,
List.of(BROKER_0, BROKER_1));
+ }
+
+ @ClusterTemplate("clusterConfig")
+ public void
testReassignReplicaMoveWithClassicGroupProtocol(ClusterInstance
clusterInstance) throws Exception {
+ executeReassignReplicaTest(clusterInstance, GroupProtocol.CLASSIC,
List.of(BROKER_1));
+ }
+
+ @ClusterTemplate("clusterConfig")
+ public void
testReassignReplicaMoveWithConsumerGroupProtocol(ClusterInstance
clusterInstance) throws Exception {
+ executeReassignReplicaTest(clusterInstance, GroupProtocol.CONSUMER,
List.of(BROKER_1));
+ }
+
+ private void executeReassignReplicaTest(ClusterInstance clusterInstance,
GroupProtocol groupProtocol, List<Integer> replicaIds) throws Exception {
+ final String topicA = "topicA";
+ final String topicB = "topicB";
+ final int p0 = 0;
+ final int partitionCount = 1;
+ final int replicationFactor = 1;
+ final int maxBatchCountPerSegment = 1;
+ final boolean enableRemoteLogStorage = true;
+ final List<Integer> metadataPartitions = new ArrayList<>();
+ for (int i = 0; i < NUM_REMOTE_LOG_METADATA_PARTITIONS; i++) {
+ metadataPartitions.add(i);
+ }
+
+ TieredStorageTestBuilder builder = new TieredStorageTestBuilder();
+ builder
+ // create topicA with 50 partitions and 2 RF. Using 50
partitions to ensure that the user-partitions
+ // are mapped to all the __remote_log_metadata partitions.
This is required to ensure that
+ // TBRLMM able to handle the assignment of the newly created
replica to one of the already assigned
+ // metadata partition
+ .createTopic(topicA, 50, 2, maxBatchCountPerSegment,
+ null, enableRemoteLogStorage)
+ .expectUserTopicMappedToMetadataPartitions(topicA,
metadataPartitions)
+ // create topicB with 1 partition and 1 RF
+ .createTopic(topicB, partitionCount, replicationFactor,
maxBatchCountPerSegment,
+ mkMap(mkEntry(p0, List.of(BROKER_0))),
enableRemoteLogStorage)
+ // send records to partition 0
+ .expectSegmentToBeOffloaded(BROKER_0, topicB, p0, 0, new
KeyValueSpec("k0", "v0"))
+ .expectSegmentToBeOffloaded(BROKER_0, topicB, p0, 1, new
KeyValueSpec("k1", "v1"))
+ .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 2L)
+ .produce(topicB, p0, new KeyValueSpec("k0", "v0"), new
KeyValueSpec("k1", "v1"),
+ new KeyValueSpec("k2", "v2"))
+ // The newly created replica gets mapped to one of the
metadata partition which is being actively
+ // consumed by both the brokers
+ .reassignReplica(topicB, p0, replicaIds)
+ .expectLeader(topicB, p0, BROKER_1, true)
+ // produce some more events and verify the earliest local
offset
+ .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 3L)
+ .produce(topicB, p0, new KeyValueSpec("k3", "v3"))
+ // consume from the beginning of the topic to read data from
local and remote storage
+ .expectFetchFromTieredStorage(BROKER_1, topicB, p0, 3)
+ .consume(topicB, p0, 0L, 4, 3);
+
+ Map<String, Object> extraConsumerProps = Map.of(
+ ConsumerConfig.GROUP_PROTOCOL_CONFIG,
groupProtocol.name().toLowerCase(Locale.ROOT)
+ );
+ try (TieredStorageTestContext context = new
TieredStorageTestContext(clusterInstance, extraConsumerProps)) {
+ try {
+ for (TieredStorageTestAction action : builder.complete()) {
+ action.execute(context);
+ }
+ } finally {
+ context.printReport(System.out);
+ }
+ }
+ }
+}
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaMoveTest.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaMoveTest.java
deleted file mode 100644
index e81a0405b69..00000000000
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaMoveTest.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.tiered.storage.integration;
-
-import java.util.List;
-
-public final class ReassignReplicaMoveTest extends BaseReassignReplicaTest {
-
- /**
- * Move the replica of the topic from broker0 to broker1
- * @return the replica-ids of the topic
- */
- @Override
- protected List<Integer> replicaIds() {
- return List.of(broker1);
- }
-}