This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3fcf9f1c415 KAFKA-17689 Migrate Disable/EnableRemoteLogOnTopicTest to
new test infra (#22377)
3fcf9f1c415 is described below
commit 3fcf9f1c415c39dcf4ce6ae7db25810a8ba95755
Author: S.Y. Wang <[email protected]>
AuthorDate: Thu May 28 00:13:13 2026 +0900
KAFKA-17689 Migrate Disable/EnableRemoteLogOnTopicTest to new test infra
(#22377)
Migrate DisableRemoteLogOnTopicTest and EnableRemoteLogOnTopicTest to
new test infra.
Reviewers: Ken Huang <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../integration/DisableRemoteLogOnTopicTest.java | 64 ++++++++++++++++------
.../integration/EnableRemoteLogOnTopicTest.java | 56 ++++++++++++++++---
2 files changed, 97 insertions(+), 23 deletions(-)
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DisableRemoteLogOnTopicTest.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DisableRemoteLogOnTopicTest.java
index 18d5b4e5b02..a94ca5a8df5 100644
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DisableRemoteLogOnTopicTest.java
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DisableRemoteLogOnTopicTest.java
@@ -16,38 +16,56 @@
*/
package org.apache.kafka.tiered.storage.integration;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfig;
+import org.apache.kafka.common.test.api.ClusterTemplate;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.tiered.storage.TieredStorageTestAction;
import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
-import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.TieredStorageTestContext;
import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.MethodSource;
-
import java.util.HashMap;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
+import java.util.Set;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
+import static
org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createServerPropsForRemoteStorage;
+
+public final class DisableRemoteLogOnTopicTest {
+
+ private static final int BROKER_COUNT = 3;
+ private static final int NUM_REMOTE_LOG_METADATA_PARTITIONS = 5;
+
+ private static List<ClusterConfig> clusterConfig() {
+ return List.of(ClusterConfig.defaultBuilder()
+ .setTypes(Set.of(Type.KRAFT))
+ .setBrokers(BROKER_COUNT)
+ .setServerProperties(createServerPropsForRemoteStorage(
+
DisableRemoteLogOnTopicTest.class.getSimpleName().toLowerCase(Locale.ROOT),
+ BROKER_COUNT,
+ NUM_REMOTE_LOG_METADATA_PARTITIONS))
+ .build());
+ }
-public final class DisableRemoteLogOnTopicTest extends
TieredStorageTestHarness {
-
- @Override
- public int brokerCount() {
- return 2;
+ @ClusterTemplate("clusterConfig")
+ public void
testDisableRemoteLogOnTopicWithClassicGroupProtocol(ClusterInstance
clusterInstance) throws Exception {
+ executeDisableRemoteLogOnTopicTest(clusterInstance,
GroupProtocol.CLASSIC);
}
- @ParameterizedTest(name = "{displayName}.groupProtocol={0}")
- @MethodSource("getTestGroupProtocolParametersAll")
- @Override
- public void executeTieredStorageTest(String groupProtocol) {
- super.executeTieredStorageTest(groupProtocol);
+ @ClusterTemplate("clusterConfig")
+ public void
testDisableRemoteLogOnTopicWithConsumerGroupProtocol(ClusterInstance
clusterInstance) throws Exception {
+ executeDisableRemoteLogOnTopicTest(clusterInstance,
GroupProtocol.CONSUMER);
}
- @Override
- protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+ private static void executeDisableRemoteLogOnTopicTest(ClusterInstance
clusterInstance, GroupProtocol groupProtocol) throws Exception {
final int broker0 = 0;
final int broker1 = 1;
final String topicA = "topicA";
@@ -74,6 +92,7 @@ public final class DisableRemoteLogOnTopicTest extends
TieredStorageTestHarness
deleteOnDisable.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
"false");
deleteOnDisable.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG,
"true");
+ TieredStorageTestBuilder builder = new TieredStorageTestBuilder();
builder
.createTopic(topicA, partitionCount, replicationFactor,
maxBatchCountPerSegment, assignment,
enableRemoteLogStorage)
@@ -123,5 +142,18 @@ public final class DisableRemoteLogOnTopicTest extends
TieredStorageTestHarness
.expectEmptyRemoteStorage(topicA, p0)
// verify the local log is still consumable
.consume(topicA, p0, 3L, 1, 0);
+
+ Map<String, Object> extraConsumerProps = Map.of(
+ ConsumerConfig.GROUP_PROTOCOL_CONFIG,
groupProtocol.name().toLowerCase(Locale.ROOT)
+ );
+ try (TieredStorageTestContext context = new
TieredStorageTestContext(clusterInstance, extraConsumerProps)) {
+ try {
+ for (TieredStorageTestAction action : builder.complete()) {
+ action.execute(context);
+ }
+ } finally {
+ context.printReport(System.out);
+ }
+ }
}
}
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/EnableRemoteLogOnTopicTest.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/EnableRemoteLogOnTopicTest.java
index 5717b91739e..caf637ad3d6 100644
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/EnableRemoteLogOnTopicTest.java
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/EnableRemoteLogOnTopicTest.java
@@ -16,26 +16,54 @@
*/
package org.apache.kafka.tiered.storage.integration;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfig;
+import org.apache.kafka.common.test.api.ClusterTemplate;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.tiered.storage.TieredStorageTestAction;
import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
-import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.TieredStorageTestContext;
import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
+import java.util.Set;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
+import static
org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createServerPropsForRemoteStorage;
-public final class EnableRemoteLogOnTopicTest extends TieredStorageTestHarness
{
+public final class EnableRemoteLogOnTopicTest {
- @Override
- public int brokerCount() {
- return 2;
+ private static final int BROKER_COUNT = 3;
+ private static final int NUM_REMOTE_LOG_METADATA_PARTITIONS = 5;
+
+ private static List<ClusterConfig> clusterConfig() {
+ return List.of(ClusterConfig.defaultBuilder()
+ .setTypes(Set.of(Type.KRAFT))
+ .setBrokers(BROKER_COUNT)
+ .setServerProperties(createServerPropsForRemoteStorage(
+
DisableRemoteLogOnTopicTest.class.getSimpleName().toLowerCase(Locale.ROOT),
+ BROKER_COUNT,
+ NUM_REMOTE_LOG_METADATA_PARTITIONS))
+ .build());
+ }
+
+ @ClusterTemplate("clusterConfig")
+ public void
testEnableRemoteLogOnTopicWithClassicGroupProtocol(ClusterInstance
clusterInstance) throws Exception {
+ executeEnableRemoteLogOnTopicTest(clusterInstance,
GroupProtocol.CLASSIC);
}
- @Override
- protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+ @ClusterTemplate("clusterConfig")
+ public void
testEnableRemoteLogOnTopicWithConsumerGroupProtocol(ClusterInstance
clusterInstance) throws Exception {
+ executeEnableRemoteLogOnTopicTest(clusterInstance,
GroupProtocol.CONSUMER);
+ }
+
+ private static void executeEnableRemoteLogOnTopicTest(ClusterInstance
clusterInstance, GroupProtocol groupProtocol) throws Exception {
final int broker0 = 0;
final int broker1 = 1;
final String topicA = "topicA";
@@ -50,6 +78,7 @@ public final class EnableRemoteLogOnTopicTest extends
TieredStorageTestHarness {
mkEntry(p1, List.of(broker1, broker0))
);
+ TieredStorageTestBuilder builder = new TieredStorageTestBuilder();
builder
.createTopic(topicA, partitionCount, replicationFactor,
maxBatchCountPerSegment, assignment,
enableRemoteLogStorage)
@@ -83,5 +112,18 @@ public final class EnableRemoteLogOnTopicTest extends
TieredStorageTestHarness {
// consume from the beginning of the topic to read data from
local and remote storage for partition 1
.expectFetchFromTieredStorage(broker1, topicA, p1, 4)
.consume(topicA, p1, 0L, 5, 4);
+
+ Map<String, Object> extraConsumerProps = Map.of(
+ ConsumerConfig.GROUP_PROTOCOL_CONFIG,
groupProtocol.name().toLowerCase(Locale.ROOT)
+ );
+ try (TieredStorageTestContext context = new
TieredStorageTestContext(clusterInstance, extraConsumerProps)) {
+ try {
+ for (TieredStorageTestAction action : builder.complete()) {
+ action.execute(context);
+ }
+ } finally {
+ context.printReport(System.out);
+ }
+ }
}
}