This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8c079f405ef KAFKA-17689 Migrate TieredStorageTestHarness to new test
infra (AlterLogDirTest) (#22306)
8c079f405ef is described below
commit 8c079f405ef6db002cfd785e5bd6f58974813c2a
Author: Ken Huang <[email protected]>
AuthorDate: Thu May 21 15:58:31 2026 +0800
KAFKA-17689 Migrate TieredStorageTestHarness to new test infra
(AlterLogDirTest) (#22306)
Migrate `AlterLogDirTest` to the new test infrastructure
Reviewers: Chia-Ping Tsai <[email protected]>, PoAn Yang
<[email protected]>, TaiJuWu <[email protected]>
---
.../storage/integration/AlterLogDirTest.java | 59 +++++++++++++++++++---
.../TransactionsWithTieredStoreTest.java | 18 +------
.../storage/utils/TieredStorageTestUtils.java | 19 +++++++
3 files changed, 72 insertions(+), 24 deletions(-)
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java
index b9230b20812..d406d8e2eba 100644
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java
@@ -16,24 +16,53 @@
*/
package org.apache.kafka.tiered.storage.integration;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfig;
+import org.apache.kafka.common.test.api.ClusterTemplate;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.tiered.storage.TieredStorageTestAction;
import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
-import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.TieredStorageTestContext;
import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
+import static
org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createServerPropsForRemoteStorage;
-public final class AlterLogDirTest extends TieredStorageTestHarness {
+public final class AlterLogDirTest {
- @Override
- public int brokerCount() {
- return 2;
+ private static final int BROKER_COUNT = 3;
+
+ private static List<ClusterConfig> clusterConfig() {
+ return List.of(ClusterConfig.defaultBuilder()
+ .setTypes(Set.of(Type.KRAFT))
+ .setBrokers(BROKER_COUNT)
+ .setDisksPerBroker(2)
+ .setServerProperties(createServerPropsForRemoteStorage(
+
AlterLogDirTest.class.getSimpleName().toLowerCase(Locale.ROOT),
+ BROKER_COUNT,
+ 5))
+ .build());
+ }
+
+ @ClusterTemplate("clusterConfig")
+ public void testAlterLogDirWithClassicGroupProtocol(ClusterInstance
clusterInstance) throws Exception {
+ executeAlterLogDirTest(clusterInstance, GroupProtocol.CLASSIC);
}
- @Override
- protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+ @ClusterTemplate("clusterConfig")
+ public void testAlterLogDirWithConsumerGroupProtocol(ClusterInstance
clusterInstance) throws Exception {
+ executeAlterLogDirTest(clusterInstance, GroupProtocol.CONSUMER);
+ }
+
+ private void executeAlterLogDirTest(ClusterInstance clusterInstance,
GroupProtocol groupProtocol) throws Exception {
final String topicB = "topicB";
final int p0 = 0;
final int partitionCount = 1;
@@ -43,8 +72,9 @@ public final class AlterLogDirTest extends
TieredStorageTestHarness {
final int broker0 = 0;
final int broker1 = 1;
+ TieredStorageTestBuilder builder = new TieredStorageTestBuilder();
builder
- // create topicB with 1 partition and 1 RF
+ // create topicB with 1 partition and 2 RF
.createTopic(topicB, partitionCount, replicationFactor,
maxBatchCountPerSegment,
mkMap(mkEntry(p0, List.of(broker1, broker0))),
enableRemoteLogStorage)
// send records to partition 0
@@ -63,5 +93,18 @@ public final class AlterLogDirTest extends
TieredStorageTestHarness {
// consume from the beginning of the topic to read data from
local and remote storage
.expectFetchFromTieredStorage(broker0, topicB, p0, 3)
.consume(topicB, p0, 0L, 4, 3);
+
+ Map<String, Object> extraConsumerProps = Map.of(
+ ConsumerConfig.GROUP_PROTOCOL_CONFIG,
groupProtocol.name().toLowerCase(Locale.ROOT)
+ );
+ try (TieredStorageTestContext context = new
TieredStorageTestContext(clusterInstance, extraConsumerProps)) {
+ try {
+ for (TieredStorageTestAction action : builder.complete()) {
+ action.execute(context);
+ }
+ } finally {
+ context.printReport(System.out);
+ }
+ }
}
}
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java
index 0eab6c6f685..dff3347248a 100644
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java
@@ -23,11 +23,9 @@ import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfig;
import org.apache.kafka.common.test.api.ClusterTemplate;
-import org.apache.kafka.common.test.api.TestKitDefaults;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.server.common.Feature;
import org.apache.kafka.server.config.ServerLogConfigs;
-import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
import org.apache.kafka.test.TestUtils;
import
org.apache.kafka.tiered.storage.integration.TransactionsTestHelper.TransactionHooks;
import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
@@ -35,12 +33,11 @@ import
org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import static
org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.STORAGE_WAIT_TIMEOUT_SEC;
-import static
org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createPropsForRemoteStorage;
+import static
org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createServerPropsForRemoteStorage;
import static
org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createTopicConfigForRemoteStorage;
/**
@@ -56,10 +53,7 @@ public class TransactionsWithTieredStoreTest {
private static final int BROKER_COUNT = 3;
private static Map<String, String> baseServerProperties() {
- String storageDirPath = TestUtils.tempDirectory(
- "kafka-remote-tier-" + TEST_CLASS_NAME).getAbsolutePath();
-
- Map<String, String> serverProps = new HashMap<>();
+ Map<String, String> serverProps =
createServerPropsForRemoteStorage(TEST_CLASS_NAME, BROKER_COUNT, 3);
serverProps.put(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG,
"false");
serverProps.put("offsets.topic.num.partitions", "1");
serverProps.put("transaction.state.log.num.partitions", "3");
@@ -70,14 +64,6 @@ public class TransactionsWithTieredStoreTest {
serverProps.put("auto.leader.rebalance.enable", "false");
serverProps.put("group.initial.rebalance.delay.ms", "0");
serverProps.put("transaction.abort.timed.out.transaction.cleanup.interval.ms",
"200");
-
- Properties tieredProps = createPropsForRemoteStorage(
- TEST_CLASS_NAME, storageDirPath, BROKER_COUNT, 3, new
Properties());
- tieredProps.forEach((k, v) -> serverProps.put(k.toString(),
v.toString()));
-
-
serverProps.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP,
- TestKitDefaults.DEFAULT_BROKER_LISTENER_NAME);
-
return serverProps;
}
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java
index bc4ffdf238f..cd0d243da04 100644
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.record.internal.Record;
+import org.apache.kafka.common.test.api.TestKitDefaults;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.config.ServerLogConfigs;
import
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager;
@@ -164,6 +165,24 @@ public class TieredStorageTestUtils {
return overridingProps;
}
+ public static Map<String, String> createServerPropsForRemoteStorage(
+ String testClassName,
+ int brokerCount,
+ int numRemoteLogMetadataPartitions
+ ) {
+ String storageDirPath = org.apache.kafka.test.TestUtils
+ .tempDirectory("kafka-remote-tier-" +
testClassName).getAbsolutePath();
+ Properties tieredProps = createPropsForRemoteStorage(
+ testClassName, storageDirPath, brokerCount,
numRemoteLogMetadataPartitions, new Properties());
+ Map<String, String> serverProps = new HashMap<>();
+ tieredProps.forEach((k, v) -> serverProps.put(k.toString(),
v.toString()));
+ serverProps.put(
+ REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP,
+ TestKitDefaults.DEFAULT_BROKER_LISTENER_NAME
+ );
+ return serverProps;
+ }
+
public static Map<String, String>
createTopicConfigForRemoteStorage(boolean enableRemoteStorage,
int maxRecordBatchPerSegment) {
Map<String, String> topicProps = new HashMap<>();