This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c0041e9c9e9 KAFKA-17689 Migrate ReassignReplicaShrinkTest and
RollAndOffloadActiveSegmentTest to new test infra (#22376)
c0041e9c9e9 is described below
commit c0041e9c9e9386ad7e3f70b7c09c7a0b9c858827
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Wed May 27 23:49:21 2026 +0800
KAFKA-17689 Migrate ReassignReplicaShrinkTest and
RollAndOffloadActiveSegmentTest to new test infra (#22376)
Migrate `ReassignReplicaShrinkTest` and
`RollAndOffloadActiveSegmentTest` to the new test infrastructure.
Reviewers: Ken Huang <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../integration/ReassignReplicaShrinkTest.java | 66 +++++++++++++++------
.../RollAndOffloadActiveSegmentTest.java | 67 ++++++++++++++++++----
2 files changed, 103 insertions(+), 30 deletions(-)
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaShrinkTest.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaShrinkTest.java
index 0dee8b0bda6..10604a92b46 100644
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaShrinkTest.java
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaShrinkTest.java
@@ -16,38 +16,53 @@
*/
package org.apache.kafka.tiered.storage.integration;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfig;
+import org.apache.kafka.common.test.api.ClusterTemplate;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.tiered.storage.TieredStorageTestAction;
import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
-import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.TieredStorageTestContext;
import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
+import java.util.Set;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
+import static
org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createServerPropsForRemoteStorage;
-public final class ReassignReplicaShrinkTest extends TieredStorageTestHarness {
+public final class ReassignReplicaShrinkTest {
+ private static final int BROKER_COUNT = 3;
+ private static final int NUM_REMOTE_LOG_METADATA_PARTITIONS = 2;
- /**
- * Cluster of two brokers
- * @return number of brokers in the cluster
- */
- @Override
- public int brokerCount() {
- return 2;
+ @SuppressWarnings("unused")
+ private static List<ClusterConfig> clusterConfig() {
+ return List.of(ClusterConfig.defaultBuilder()
+ .setTypes(Set.of(Type.KRAFT))
+ .setBrokers(BROKER_COUNT)
+ .setServerProperties(createServerPropsForRemoteStorage(
+
ReassignReplicaShrinkTest.class.getSimpleName().toLowerCase(Locale.ROOT),
+ BROKER_COUNT,
+ NUM_REMOTE_LOG_METADATA_PARTITIONS))
+ .build());
}
- /**
- * Number of partitions in the '__remote_log_metadata' topic
- * @return number of partitions in the '__remote_log_metadata' topic
- */
- @Override
- public int numRemoteLogMetadataPartitions() {
- return 2;
+ @ClusterTemplate("clusterConfig")
+ public void
testReassignReplicaShrinkWithClassicGroupProtocol(ClusterInstance
clusterInstance) throws Exception {
+ executeReassignReplicaShrinkTest(clusterInstance,
GroupProtocol.CLASSIC);
}
- @Override
- protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+ @ClusterTemplate("clusterConfig")
+ public void
testReassignReplicaShrinkWithConsumerGroupProtocol(ClusterInstance
clusterInstance) throws Exception {
+ executeReassignReplicaShrinkTest(clusterInstance,
GroupProtocol.CONSUMER);
+ }
+
+ private void executeReassignReplicaShrinkTest(ClusterInstance
clusterInstance, GroupProtocol groupProtocol) throws Exception {
final int broker0 = 0;
final int broker1 = 1;
final String topicA = "topicA";
@@ -62,7 +77,7 @@ public final class ReassignReplicaShrinkTest extends
TieredStorageTestHarness {
mkEntry(p1, List.of(broker1, broker0))
);
- builder
+ final TieredStorageTestBuilder builder = new TieredStorageTestBuilder()
// create topicA with 2 partitions and 2 RF
.createTopic(topicA, partitionCount, replicationFactor,
maxBatchCountPerSegment,
replicaAssignment, enableRemoteLogStorage)
@@ -100,5 +115,18 @@ public final class ReassignReplicaShrinkTest extends
TieredStorageTestHarness {
.consume(topicA, p0, 0L, 4, 3)
.expectFetchFromTieredStorage(broker0, topicA, p1, 3)
.consume(topicA, p1, 0L, 4, 3);
+
+ final Map<String, Object> extraConsumerProps = Map.of(
+ ConsumerConfig.GROUP_PROTOCOL_CONFIG,
groupProtocol.name().toLowerCase(Locale.ROOT)
+ );
+ try (TieredStorageTestContext context = new
TieredStorageTestContext(clusterInstance, extraConsumerProps)) {
+ try {
+ for (TieredStorageTestAction action : builder.complete()) {
+ action.execute(context);
+ }
+ } finally {
+ context.printReport(System.out);
+ }
+ }
}
}
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/RollAndOffloadActiveSegmentTest.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/RollAndOffloadActiveSegmentTest.java
index 5009793b94b..e960c234760 100644
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/RollAndOffloadActiveSegmentTest.java
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/RollAndOffloadActiveSegmentTest.java
@@ -16,39 +16,71 @@
*/
package org.apache.kafka.tiered.storage.integration;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfig;
+import org.apache.kafka.common.test.api.ClusterTemplate;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.tiered.storage.TieredStorageTestAction;
import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
-import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.TieredStorageTestContext;
import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
import java.util.HashMap;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
+import java.util.Set;
+
+import static
org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createServerPropsForRemoteStorage;
/**
* Test to verify that the active segment is rolled and uploaded to remote
storage when the segment breaches the
* local log retention policy.
*/
-public class RollAndOffloadActiveSegmentTest extends TieredStorageTestHarness {
+public final class RollAndOffloadActiveSegmentTest {
+ private static final int BROKER_COUNT = 3;
+ private static final int NUM_REMOTE_LOG_METADATA_PARTITIONS = 5;
+
+ @SuppressWarnings("unused")
+ private static List<ClusterConfig> clusterConfig() {
+ return List.of(ClusterConfig.defaultBuilder()
+ .setTypes(Set.of(Type.KRAFT))
+ .setBrokers(BROKER_COUNT)
+ .setServerProperties(createServerPropsForRemoteStorage(
+
RollAndOffloadActiveSegmentTest.class.getSimpleName().toLowerCase(Locale.ROOT),
+ BROKER_COUNT,
+ NUM_REMOTE_LOG_METADATA_PARTITIONS))
+ .build());
+ }
- @Override
- public int brokerCount() {
- return 1;
+ @ClusterTemplate("clusterConfig")
+ public void
testRollAndOffloadActiveSegmentWithClassicGroupProtocol(ClusterInstance
clusterInstance) throws Exception {
+ executeRollAndOffloadActiveSegmentTest(clusterInstance,
GroupProtocol.CLASSIC);
}
- @Override
- protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+ @ClusterTemplate("clusterConfig")
+ public void
testRollAndOffloadActiveSegmentWithConsumerGroupProtocol(ClusterInstance
clusterInstance) throws Exception {
+ executeRollAndOffloadActiveSegmentTest(clusterInstance,
GroupProtocol.CONSUMER);
+ }
+
+ private void executeRollAndOffloadActiveSegmentTest(ClusterInstance
clusterInstance, GroupProtocol groupProtocol) throws Exception {
final int broker0 = 0;
final String topicA = "topicA";
final int p0 = 0;
final int partitionCount = 1;
final int replicationFactor = 1;
final int maxBatchCountPerSegment = 1;
- final Map<Integer, List<Integer>> replicaAssignment = null;
+ // Pin the partition to broker 0 so the broker0-based expectations are
deterministic
+ // regardless of how many brokers the cluster has.
+ final Map<Integer, List<Integer>> replicaAssignment = Map.of(p0,
List.of(broker0));
final boolean enableRemoteLogStorage = true;
- // Create topicA with 1 partition, 1 RF and enabled with remote
storage.
- builder.createTopic(topicA, partitionCount, replicationFactor,
maxBatchCountPerSegment, replicaAssignment,
+ final TieredStorageTestBuilder builder = new TieredStorageTestBuilder()
+ // Create topicA with 1 partition, 1 RF and enabled with
remote storage.
+ .createTopic(topicA, partitionCount, replicationFactor,
maxBatchCountPerSegment, replicaAssignment,
enableRemoteLogStorage)
// update the topic config such that it triggers the rolling
of the active segment
.updateTopicConfig(topicA, configsToBeAdded(), List.of())
@@ -63,9 +95,22 @@ public class RollAndOffloadActiveSegmentTest extends
TieredStorageTestHarness {
// consume from the beginning of the topic to read data from
local and remote storage
.expectFetchFromTieredStorage(broker0, topicA, p0, 4)
.consume(topicA, p0, 0L, 4, 4);
+
+ final Map<String, Object> extraConsumerProps = Map.of(
+ ConsumerConfig.GROUP_PROTOCOL_CONFIG,
groupProtocol.name().toLowerCase(Locale.ROOT)
+ );
+ try (TieredStorageTestContext context = new
TieredStorageTestContext(clusterInstance, extraConsumerProps)) {
+ try {
+ for (TieredStorageTestAction action : builder.complete()) {
+ action.execute(context);
+ }
+ } finally {
+ context.printReport(System.out);
+ }
+ }
}
- private Map<String, String> configsToBeAdded() {
+ private static Map<String, String> configsToBeAdded() {
// Update localLog retentionMs to 1 ms and segment roll-time to 10 ms
Map<String, String> topicConfigs = new HashMap<>();
topicConfigs.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "1");