This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 76ab0cf4804 KAFKA-17689 Migrate OffloadAndConsumeFromLeaderTest and 
OffloadAndTxnConsumeFromLeaderTest to new test infra (#22396)
76ab0cf4804 is described below

commit 76ab0cf480400db865f7d82fb9dee1f4171a39d2
Author: Ming-Yen Chung <[email protected]>
AuthorDate: Thu May 28 22:08:09 2026 +0800

    KAFKA-17689 Migrate OffloadAndConsumeFromLeaderTest and 
OffloadAndTxnConsumeFromLeaderTest to new test infra (#22396)
    
    Migrate `OffloadAndConsumeFromLeaderTest` and
    `OffloadAndTxnConsumeFromLeaderTest` to the new test infrastructure.
    
    Reviewers: Ken Huang <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 .../OffloadAndConsumeFromLeaderTest.java           | 83 +++++++++++++++-----
 .../OffloadAndTxnConsumeFromLeaderTest.java        | 91 ++++++++++++++--------
 2 files changed, 122 insertions(+), 52 deletions(-)

diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndConsumeFromLeaderTest.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndConsumeFromLeaderTest.java
index fad98604f72..539cd008168 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndConsumeFromLeaderTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndConsumeFromLeaderTest.java
@@ -16,31 +16,57 @@
  */
 package org.apache.kafka.tiered.storage.integration;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfig;
+import org.apache.kafka.common.test.api.ClusterTemplate;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.tiered.storage.TieredStorageTestAction;
 import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
-import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.TieredStorageTestContext;
 import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
 
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createServerPropsForRemoteStorage;
 
 /**
  * Test Cases:
  *    Elementary offloads and fetches from tiered storage.
  */
-public final class OffloadAndConsumeFromLeaderTest extends 
TieredStorageTestHarness {
+public final class OffloadAndConsumeFromLeaderTest {
+
+    private static final int BROKER_COUNT = 3;
+    private static final int NUM_REMOTE_LOG_METADATA_PARTITIONS = 5;
+
+    @SuppressWarnings("unused")
+    private static List<ClusterConfig> clusterConfig() {
+        return List.of(ClusterConfig.defaultBuilder()
+                .setTypes(Set.of(Type.KRAFT))
+                .setBrokers(BROKER_COUNT)
+                .setServerProperties(createServerPropsForRemoteStorage(
+                        
OffloadAndConsumeFromLeaderTest.class.getSimpleName().toLowerCase(Locale.ROOT),
+                        BROKER_COUNT,
+                        NUM_REMOTE_LOG_METADATA_PARTITIONS))
+                .build());
+    }
 
-    /**
-     * Cluster of one broker
-     * @return number of brokers in the cluster
-     */
-    @Override
-    public int brokerCount() {
-        return 1;
+    @ClusterTemplate("clusterConfig")
+    public void 
testOffloadAndConsumeFromLeaderWithClassicGroupProtocol(ClusterInstance 
clusterInstance) throws Exception {
+        executeOffloadAndConsumeFromLeaderTest(clusterInstance, 
GroupProtocol.CLASSIC);
     }
 
-    @Override
-    protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
-        final int broker = 0;
+    @ClusterTemplate("clusterConfig")
+    public void 
testOffloadAndConsumeFromLeaderWithConsumerGroupProtocol(ClusterInstance 
clusterInstance) throws Exception {
+        executeOffloadAndConsumeFromLeaderTest(clusterInstance, 
GroupProtocol.CONSUMER);
+    }
+
+    private void executeOffloadAndConsumeFromLeaderTest(ClusterInstance 
clusterInstance, GroupProtocol groupProtocol) throws Exception {
+        final int broker0 = 0;
         final String topicA = "topicA";
         final String topicB = "topicB";
         final int p0 = 0;
@@ -48,10 +74,12 @@ public final class OffloadAndConsumeFromLeaderTest extends 
TieredStorageTestHarn
         final int replicationFactor = 1;
         final int oneBatchPerSegment = 1;
         final int twoBatchPerSegment = 2;
-        final Map<Integer, List<Integer>> replicaAssignment = null;
+        // Pin the partition to broker 0 so that the broker0-based 
expectations are deterministic
+        // regardless of how many brokers the cluster has.
+        final Map<Integer, List<Integer>> replicaAssignment = Map.of(p0, 
List.of(broker0));
         final boolean enableRemoteLogStorage = true;
 
-        builder
+        final TieredStorageTestBuilder builder = new TieredStorageTestBuilder()
                 /*
                  * (1) Create a topic which segments contain only one batch 
and produce three records
                  *       with a batch size of 1.
@@ -77,8 +105,8 @@ public final class OffloadAndConsumeFromLeaderTest extends 
TieredStorageTestHarn
                  */
                 .createTopic(topicA, partitionCount, replicationFactor, 
oneBatchPerSegment, replicaAssignment,
                         enableRemoteLogStorage)
-                .expectSegmentToBeOffloaded(broker, topicA, p0, 0, new 
KeyValueSpec("k0", "v0"))
-                .expectSegmentToBeOffloaded(broker, topicA, p0, 1, new 
KeyValueSpec("k1", "v1"))
+                .expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new 
KeyValueSpec("k0", "v0"))
+                .expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new 
KeyValueSpec("k1", "v1"))
                 .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 2L)
                 .produce(topicA, p0, new KeyValueSpec("k0", "v0"), new 
KeyValueSpec("k1", "v1"),
                         new KeyValueSpec("k2", "v2"))
@@ -107,9 +135,9 @@ public final class OffloadAndConsumeFromLeaderTest extends 
TieredStorageTestHarn
                 .createTopic(topicB, partitionCount, replicationFactor, 
twoBatchPerSegment, replicaAssignment,
                         enableRemoteLogStorage)
                 .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 4L)
-                .expectSegmentToBeOffloaded(broker, topicB, p0, 0,
+                .expectSegmentToBeOffloaded(broker0, topicB, p0, 0,
                         new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", 
"v1"))
-                .expectSegmentToBeOffloaded(broker, topicB, p0, 2,
+                .expectSegmentToBeOffloaded(broker0, topicB, p0, 2,
                         new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", 
"v3"))
                 .produce(topicB, p0, new KeyValueSpec("k0", "v0"), new 
KeyValueSpec("k1", "v1"),
                         new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", 
"v3"), new KeyValueSpec("k4", "v4"))
@@ -126,10 +154,23 @@ public final class OffloadAndConsumeFromLeaderTest 
extends TieredStorageTestHarn
                  *       - For topic B, two segments are present in the tiered 
storage, as asserted by the
                  *         previous sub-test-case.
                  */
-                .bounce(broker)
-                .expectFetchFromTieredStorage(broker, topicA, p0, 1)
+                .bounce(broker0)
+                .expectFetchFromTieredStorage(broker0, topicA, p0, 1)
                 .consume(topicA, p0, 1L, 2, 1)
-                .expectFetchFromTieredStorage(broker, topicB, p0, 2)
+                .expectFetchFromTieredStorage(broker0, topicB, p0, 2)
                 .consume(topicB, p0, 1L, 4, 3);
+
+        final Map<String, Object> extraConsumerProps = Map.of(
+                ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol.name().toLowerCase(Locale.ROOT)
+        );
+        try (TieredStorageTestContext context = new 
TieredStorageTestContext(clusterInstance, extraConsumerProps)) {
+            try {
+                for (TieredStorageTestAction action : builder.complete()) {
+                    action.execute(context);
+                }
+            } finally {
+                context.printReport(System.out);
+            }
+        }
     }
 }
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndTxnConsumeFromLeaderTest.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndTxnConsumeFromLeaderTest.java
index 3f2a9ce1581..b5f99785e0f 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndTxnConsumeFromLeaderTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndTxnConsumeFromLeaderTest.java
@@ -17,68 +17,83 @@
 package org.apache.kafka.tiered.storage.integration;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.GroupProtocol;
 import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfig;
+import org.apache.kafka.common.test.api.ClusterTemplate;
+import org.apache.kafka.common.test.api.Type;
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
+import org.apache.kafka.tiered.storage.TieredStorageTestAction;
 import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
-import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.TieredStorageTestContext;
 import org.apache.kafka.tiered.storage.specs.FetchCountAndOp;
 import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
 import org.apache.kafka.tiered.storage.specs.RemoteFetchCount;
 
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
-import java.util.Properties;
+import java.util.Set;
 
 import static 
org.apache.kafka.tiered.storage.specs.RemoteFetchCount.OperationType.LESS_THAN_OR_EQUALS_TO;
+import static 
org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createServerPropsForRemoteStorage;
 
 /**
  * Test Cases:
  *    Elementary offloads and fetches from tiered storage using consumer with 
read_committed isolation level.
  */
-public final class OffloadAndTxnConsumeFromLeaderTest extends 
TieredStorageTestHarness {
+public final class OffloadAndTxnConsumeFromLeaderTest {
 
-    /**
-     * Cluster of one broker
-     * @return number of brokers in the cluster
-     */
-    @Override
-    public int brokerCount() {
-        return 1;
-    }
+    private static final int BROKER_COUNT = 3;
+    private static final int NUM_REMOTE_LOG_METADATA_PARTITIONS = 5;
 
-    @Override
-    public Properties overridingProps() {
-        Properties props = super.overridingProps();
+    @SuppressWarnings("unused")
+    private static List<ClusterConfig> clusterConfig() {
+        Map<String, String> serverProps = createServerPropsForRemoteStorage(
+                
OffloadAndTxnConsumeFromLeaderTest.class.getSimpleName().toLowerCase(Locale.ROOT),
+                BROKER_COUNT,
+                NUM_REMOTE_LOG_METADATA_PARTITIONS);
         // Configure the remote-log index cache size to hold one entry to 
simulate eviction of cached index entries.
-        
props.put(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
 "1");
-        return props;
+        
serverProps.put(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
 "1");
+        return List.of(ClusterConfig.defaultBuilder()
+                .setTypes(Set.of(Type.KRAFT))
+                .setBrokers(BROKER_COUNT)
+                .setServerProperties(serverProps)
+                .build());
+    }
+
+    @ClusterTemplate("clusterConfig")
+    public void 
testOffloadAndTxnConsumeFromLeaderWithClassicGroupProtocol(ClusterInstance 
clusterInstance) throws Exception {
+        executeOffloadAndTxnConsumeFromLeaderTest(clusterInstance, 
GroupProtocol.CLASSIC);
     }
 
-    @Override
-    protected void overrideConsumerConfig(Map<String, Object> consumerConfig) {
-        consumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
IsolationLevel.READ_COMMITTED.toString());
+    @ClusterTemplate("clusterConfig")
+    public void 
testOffloadAndTxnConsumeFromLeaderWithConsumerGroupProtocol(ClusterInstance 
clusterInstance) throws Exception {
+        executeOffloadAndTxnConsumeFromLeaderTest(clusterInstance, 
GroupProtocol.CONSUMER);
     }
 
-    @Override
-    protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
-        final int broker = 0;
+    private void executeOffloadAndTxnConsumeFromLeaderTest(ClusterInstance 
clusterInstance, GroupProtocol groupProtocol) throws Exception {
+        final int broker0 = 0;
         final String topicA = "topicA";
         final int p0 = 0;
         final int partitionCount = 1;
         final int replicationFactor = 1;
         final int oneBatchPerSegment = 1;
-        final Map<Integer, List<Integer>> replicaAssignment = null;
+        // Pin the partition to broker 0 so that the broker0-based 
expectations are deterministic
+        // regardless of how many brokers the cluster has.
+        final Map<Integer, List<Integer>> replicaAssignment = Map.of(p0, 
List.of(broker0));
         final boolean enableRemoteLogStorage = true;
 
-        builder
+        final TieredStorageTestBuilder builder = new TieredStorageTestBuilder()
                 .createTopic(topicA, partitionCount, replicationFactor, 
oneBatchPerSegment, replicaAssignment,
                         enableRemoteLogStorage)
-                .expectSegmentToBeOffloaded(broker, topicA, p0, 0, new 
KeyValueSpec("k0", "v0"))
-                .expectSegmentToBeOffloaded(broker, topicA, p0, 1, new 
KeyValueSpec("k1", "v1"))
-                .expectSegmentToBeOffloaded(broker, topicA, p0, 2, new 
KeyValueSpec("k2", "v2"))
-                .expectSegmentToBeOffloaded(broker, topicA, p0, 3, new 
KeyValueSpec("k3", "v3"))
-                .expectSegmentToBeOffloaded(broker, topicA, p0, 4, new 
KeyValueSpec("k4", "v4"))
-                .expectSegmentToBeOffloaded(broker, topicA, p0, 5, new 
KeyValueSpec("k5", "v5"))
+                .expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new 
KeyValueSpec("k0", "v0"))
+                .expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new 
KeyValueSpec("k1", "v1"))
+                .expectSegmentToBeOffloaded(broker0, topicA, p0, 2, new 
KeyValueSpec("k2", "v2"))
+                .expectSegmentToBeOffloaded(broker0, topicA, p0, 3, new 
KeyValueSpec("k3", "v3"))
+                .expectSegmentToBeOffloaded(broker0, topicA, p0, 4, new 
KeyValueSpec("k4", "v4"))
+                .expectSegmentToBeOffloaded(broker0, topicA, p0, 5, new 
KeyValueSpec("k5", "v5"))
                 .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 6L)
                 .produce(topicA, p0, new KeyValueSpec("k0", "v0"), new 
KeyValueSpec("k1", "v1"),
                         new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", 
"v3"), new KeyValueSpec("k4", "v4"),
@@ -89,8 +104,22 @@ public final class OffloadAndTxnConsumeFromLeaderTest 
extends TieredStorageTestH
                 // Total number of uploaded remote segments = 6. Total number 
of index fetches = (6 * (6 + 1)) / 2 = 21
                 // Note that we skip the index fetch when the txn-index is 
empty, so the effective index fetch count
                 // should be same as the segment count.
-                .expectFetchFromTieredStorage(broker, topicA, p0, 
getRemoteFetchCount())
+                .expectFetchFromTieredStorage(broker0, topicA, p0, 
getRemoteFetchCount())
                 .consume(topicA, p0, 0L, 7, 6);
+
+        final Map<String, Object> extraConsumerProps = Map.of(
+                ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol.name().toLowerCase(Locale.ROOT),
+                ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
IsolationLevel.READ_COMMITTED.toString()
+        );
+        try (TieredStorageTestContext context = new 
TieredStorageTestContext(clusterInstance, extraConsumerProps)) {
+            try {
+                for (TieredStorageTestAction action : builder.complete()) {
+                    action.execute(context);
+                }
+            } finally {
+                context.printReport(System.out);
+            }
+        }
     }
 
     private static RemoteFetchCount getRemoteFetchCount() {

Reply via email to