This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a5620420b3a KAFKA-17689 Migrate ListOffsetsTest to new test infra
(#22369)
a5620420b3a is described below
commit a5620420b3a841366861e9fd3faf3c6c2d9dc76d
Author: Eric Chang <[email protected]>
AuthorDate: Tue May 26 23:04:56 2026 +0800
KAFKA-17689 Migrate ListOffsetsTest to new test infra (#22369)
Migrate `ListOffsetsTest` to the new test infrastructure
Reviewers: Ken Huang <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../storage/integration/ListOffsetsTest.java | 67 ++++++++++++++++------
1 file changed, 48 insertions(+), 19 deletions(-)
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/ListOffsetsTest.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/ListOffsetsTest.java
index 972e3b69873..c2ca43abb74 100644
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/ListOffsetsTest.java
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/ListOffsetsTest.java
@@ -17,43 +17,58 @@
package org.apache.kafka.tiered.storage.integration;
import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfig;
+import org.apache.kafka.common.test.api.ClusterTemplate;
+import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.storage.internals.log.EpochEntry;
+import org.apache.kafka.tiered.storage.TieredStorageTestAction;
import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
-import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.TieredStorageTestContext;
import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.MethodSource;
-
import java.util.List;
+import java.util.Locale;
import java.util.Map;
+import java.util.Set;
import static
org.apache.kafka.common.record.internal.RecordBatch.NO_PARTITION_LEADER_EPOCH;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
+import static
org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createServerPropsForRemoteStorage;
+
+public final class ListOffsetsTest {
+
+ private static final int BROKER_COUNT = 3;
+ private static final int NUM_REMOTE_LOG_METADATA_PARTITIONS = 5;
-public class ListOffsetsTest extends TieredStorageTestHarness {
- @Override
- public int brokerCount() {
- return 2;
+ private static List<ClusterConfig> clusterConfig() {
+ return List.of(ClusterConfig.defaultBuilder()
+ .setTypes(Set.of(Type.KRAFT))
+ .setBrokers(BROKER_COUNT)
+ .setServerProperties(createServerPropsForRemoteStorage(
+
ListOffsetsTest.class.getSimpleName().toLowerCase(Locale.ROOT),
+ BROKER_COUNT,
+ NUM_REMOTE_LOG_METADATA_PARTITIONS))
+ .build());
}
- /**
- * We are running this test only for the Kraft mode, since ZK mode is
deprecated now. Note that:
- * 1. In Kraft mode, the leader-epoch gets bumped only for leader-election
(0 -> 1) and not for reassignment.
- */
- @ParameterizedTest(name = "{displayName}.groupProtocol={0}")
- @MethodSource("getTestGroupProtocolParametersAll")
- @Override
- public void executeTieredStorageTest(String groupProtocol) {
- super.executeTieredStorageTest(groupProtocol);
+ @ClusterTemplate("clusterConfig")
+ public void testListOffsetsWithClassicGroupProtocol(ClusterInstance
clusterInstance) throws Exception {
+ executeListOffsetsTest(clusterInstance, GroupProtocol.CLASSIC);
}
- @Override
- protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+ @ClusterTemplate("clusterConfig")
+ public void testListOffsetsWithConsumerGroupProtocol(ClusterInstance
clusterInstance) throws Exception {
+ executeListOffsetsTest(clusterInstance, GroupProtocol.CONSUMER);
+ }
+
+ private void executeListOffsetsTest(ClusterInstance clusterInstance,
GroupProtocol groupProtocol) throws Exception {
final int broker0 = 0;
final int broker1 = 1;
final String topicA = "topicA";
@@ -62,6 +77,7 @@ public class ListOffsetsTest extends TieredStorageTestHarness
{
final long timestamp = time.milliseconds();
final Map<Integer, List<Integer>> assignment = mkMap(mkEntry(p0,
List.of(broker0, broker1)));
+ TieredStorageTestBuilder builder = new TieredStorageTestBuilder();
builder
.createTopic(topicA, 1, 2, 2, assignment, true)
// send records to partition 0 and expect the first segment to
be offloaded
@@ -120,5 +136,18 @@ public class ListOffsetsTest extends
TieredStorageTestHarness {
.expectListOffsets(topicA, p0, OffsetSpec.earliestLocal(), new
EpochEntry(1, 4))
.expectListOffsets(topicA, p0, OffsetSpec.latestTiered(), new
EpochEntry(NO_PARTITION_LEADER_EPOCH, 3))
.expectListOffsets(topicA, p0, OffsetSpec.latest(), new
EpochEntry(1, 6));
+
+ Map<String, Object> extraConsumerProps = Map.of(
+ ConsumerConfig.GROUP_PROTOCOL_CONFIG,
groupProtocol.name().toLowerCase(Locale.ROOT)
+ );
+ try (TieredStorageTestContext context = new
TieredStorageTestContext(clusterInstance, extraConsumerProps)) {
+ try {
+ for (TieredStorageTestAction action : builder.complete()) {
+ action.execute(context);
+ }
+ } finally {
+ context.printReport(System.out);
+ }
+ }
}
}