This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 55a9d4069b3 KAFKA-17689 Change TieredStorageTestContext to use
ClusterInstance (#22294)
55a9d4069b3 is described below
commit 55a9d4069b3ee6f10b5a8edfecaa1414e67c9a07
Author: PoAn Yang <[email protected]>
AuthorDate: Mon May 18 11:40:32 2026 +0900
KAFKA-17689 Change TieredStorageTestContext to use ClusterInstance (#22294)
Make TieredStorageTestContext support ClusterInstance.
Reviewers: Ken Huang <[email protected]>, Murali Basani
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
build.gradle | 1 +
checkstyle/import-control-storage.xml | 1 +
.../storage/HarnessBackedClusterInstance.java | 196 +++++++++++++++++++++
.../tiered/storage/TieredStorageTestContext.java | 115 ++++++------
.../tiered/storage/TieredStorageTestHarness.java | 18 +-
.../tiered/storage/TieredStorageTestReport.java | 2 +-
.../OffloadAndTxnConsumeFromLeaderTest.java | 2 +-
7 files changed, 267 insertions(+), 68 deletions(-)
diff --git a/build.gradle b/build.gradle
index 9cee79e0610..715fe2f4163 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2467,6 +2467,7 @@ project(':storage') {
testImplementation project(':core')
testImplementation project(':core').sourceSets.test.output
testImplementation testFixtures(project(':storage:storage-api'))
+ testImplementation project(':metadata')
testImplementation project(':test-common:test-common-internal-api')
testImplementation project(':test-common:test-common-runtime')
testImplementation project(':test-common:test-common-util')
diff --git a/checkstyle/import-control-storage.xml
b/checkstyle/import-control-storage.xml
index 040e3761e37..3599ca5fffe 100644
--- a/checkstyle/import-control-storage.xml
+++ b/checkstyle/import-control-storage.xml
@@ -118,6 +118,7 @@
<allow pkg="org.apache.kafka.tiered.storage" />
<allow pkg="kafka.api" />
+ <allow pkg="kafka.integration" />
<allow pkg="kafka.log" />
<allow pkg="kafka.server" />
<allow pkg="kafka.utils" />
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/HarnessBackedClusterInstance.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/HarnessBackedClusterInstance.java
new file mode 100644
index 00000000000..45726f9dfc7
--- /dev/null
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/HarnessBackedClusterInstance.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage;
+
+import kafka.integration.KafkaServerTestHarness;
+import kafka.server.ControllerServer;
+import kafka.server.KafkaBroker;
+
+import org.apache.kafka.common.network.ConnectionMode;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfig;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.server.fault.FaultHandlerException;
+import org.apache.kafka.test.TestSslUtils;
+import org.apache.kafka.test.TestUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import scala.jdk.javaapi.CollectionConverters;
+
+/**
+ * A {@link ClusterInstance} implementation backed by a {@link
KafkaServerTestHarness}.
+ * This allows {@link TieredStorageTestContext} to depend only on {@link
ClusterInstance}
+ * rather than on the concrete harness class.
+ */
+public class HarnessBackedClusterInstance implements ClusterInstance {
+
+ private final KafkaServerTestHarness harness;
+
+ public HarnessBackedClusterInstance(KafkaServerTestHarness harness) {
+ this.harness = harness;
+ }
+
+ @Override
+ public Type type() {
+ return Type.KRAFT;
+ }
+
+ @Override
+ public ClusterConfig config() {
+ return ClusterConfig.defaultBuilder()
+ .setBrokers(harness.brokers().size())
+ .setControllers(harness.controllerServers().size())
+ .setBrokerSecurityProtocol(harness.securityProtocol())
+ .setBrokerListenerName(harness.listenerName())
+ .setControllerListenerName(controllerListenerName())
+ .setMetadataVersion(harness.metadataVersion())
+ .build();
+ }
+
+ @Override
+ public Set<Integer> controllerIds() {
+ return controllers().keySet();
+ }
+
+ @Override
+ public ListenerName clientListener() {
+ return harness.listenerName();
+ }
+
+ @Override
+ public ListenerName controllerListenerName() {
+ return controllers().values().stream()
+ .map(c -> new
ListenerName(c.config().controllerListenerNames().get(0)))
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException("No controllers
available"));
+ }
+
+ @Override
+ public String bootstrapServers() {
+ return harness.bootstrapServers(harness.listenerName());
+ }
+
+ @Override
+ public String bootstrapControllers() {
+ throw new UnsupportedOperationException("bootstrapControllers() is not
supported in HarnessBackedClusterInstance");
+ }
+
+ @Override
+ public String clusterId() {
+ return brokers().values().stream()
+ .map(KafkaBroker::clusterId)
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException("No brokers
available"));
+ }
+
+ @Override
+ public Map<Integer, KafkaBroker> brokers() {
+ return CollectionConverters.asJava(harness.brokers()).stream()
+ .collect(Collectors.toMap(b -> b.config().brokerId(), b -> b));
+ }
+
+ @Override
+ public Map<Integer, ControllerServer> controllers() {
+ return
CollectionConverters.asJava(harness.controllerServers()).stream()
+ .collect(Collectors.toMap(c -> c.config().nodeId(), c -> c));
+ }
+
+ @Override
+ public void shutdownBroker(int brokerId) {
+ harness.killBroker(brokerId);
+ }
+
+ @Override
+ public void startBroker(int brokerId) {
+ harness.startBroker(brokerId);
+ }
+
+ @Override
+ public void start() {
+ throw new UnsupportedOperationException("start() is managed by
KafkaServerTestHarness");
+ }
+
+ @Override
+ public boolean started() {
+ return true;
+ }
+
+ @Override
+ public void stop() {
+ throw new UnsupportedOperationException("stop() is managed by
KafkaServerTestHarness");
+ }
+
+ @Override
+ public boolean stopped() {
+ return false;
+ }
+
+ @Override
+ public void waitForReadyBrokers() throws InterruptedException {
+ Map<Integer, KafkaBroker> brokerMap = brokers();
+
+ // Step 1: wait until a controller marks all brokers as registered and
unfenced
+ ControllerServer controllerServer =
controllers().values().iterator().next();
+ try {
+
controllerServer.controller().waitForReadyBrokers(brokerMap.size()).get();
+ } catch (ExecutionException e) {
+ throw new AssertionError("Failed while waiting for brokers to
become ready", e);
+ }
+
+ // Step 2: wait until each broker's metadata cache knows about all
alive brokers
+ Set<Integer> brokerIds = brokerMap.keySet();
+ TestUtils.waitForCondition(
+ () -> brokerMap.values().stream().allMatch(
+ broker -> brokerIds.stream().allMatch(id ->
broker.metadataCache().hasAliveBroker(id))
+ ),
+ "Timed out waiting for metadata cache to reflect all alive brokers"
+ );
+ }
+
+ @Override
+ public Optional<FaultHandlerException> firstFatalException() {
+ return Optional.ofNullable(harness.faultHandler().firstException());
+ }
+
+ @Override
+ public Optional<FaultHandlerException> firstNonFatalException() {
+ return Optional.empty();
+ }
+
+ @Override
+ public Map<String, Object> setClientSslConfig(Map<String, Object> configs)
{
+ if (harness.trustStoreFile().isEmpty()) {
+ return configs;
+ }
+ try {
+ Map<String, Object> props = new HashMap<>(configs);
+ props.putAll(new
TestSslUtils.SslConfigsBuilder(ConnectionMode.CLIENT)
+ .useExistingTrustStore(harness.trustStoreFile().get())
+ .build());
+ return props;
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to build client SSL config", e);
+ }
+ }
+}
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
index 9a7e94b2058..428cc5f7662 100644
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
@@ -16,9 +16,6 @@
*/
package org.apache.kafka.tiered.storage;
-import kafka.utils.TestUtils;
-
-import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsOptions;
@@ -27,24 +24,25 @@ import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigResource;
-import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
import org.apache.kafka.server.log.remote.storage.LocalTieredStorageHistory;
import org.apache.kafka.server.log.remote.storage.LocalTieredStorageSnapshot;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
import org.apache.kafka.storage.internals.log.UnifiedLog;
+import org.apache.kafka.test.TestUtils;
import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec;
import org.apache.kafka.tiered.storage.specs.TopicSpec;
import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
@@ -52,66 +50,62 @@ import
org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.PrintStream;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
-import scala.Function0;
-import scala.Function1;
-import scala.jdk.javaapi.CollectionConverters;
-
import static
org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
public final class TieredStorageTestContext implements AutoCloseable {
- private final TieredStorageTestHarness harness;
- private final Serializer<String> ser = new StringSerializer();
- private final Deserializer<String> de = new StringDeserializer();
+ private final ClusterInstance cluster;
+ private final Map<String, Object> extraConsumerProps;
private final Map<String, TopicSpec> topicSpecs = new HashMap<>();
private final TieredStorageTestReport testReport;
- private volatile KafkaProducer<String, String> producer;
+ private volatile Producer<String, String> producer;
private volatile Consumer<String, String> consumer;
private volatile Admin admin;
private volatile List<LocalTieredStorage> remoteStorageManagers;
private volatile List<BrokerLocalStorage> localStorages;
- public TieredStorageTestContext(TieredStorageTestHarness harness) {
- this.harness = harness;
+ public TieredStorageTestContext(ClusterInstance cluster, Map<String,
Object> extraConsumerProps) {
+ this.cluster = cluster;
+ this.extraConsumerProps = extraConsumerProps;
this.testReport = new TieredStorageTestReport(this);
initClients();
initContext();
}
private void initClients() {
- // rediscover the new bootstrap-server port in case of broker restarts
- ListenerName listenerName = harness.listenerName();
- Properties commonOverrideProps = new Properties();
- commonOverrideProps.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
harness.bootstrapServers(listenerName));
-
// Set a producer linger of 60 seconds, in order to optimistically
generate batches of
// records with a pre-determined size.
- Properties producerOverrideProps = new Properties();
- producerOverrideProps.put(LINGER_MS_CONFIG,
String.valueOf(TimeUnit.SECONDS.toMillis(60)));
- producerOverrideProps.putAll(commonOverrideProps);
+ Map<String, Object> producerProps = new HashMap<>();
+ producerProps.put(LINGER_MS_CONFIG,
String.valueOf(TimeUnit.SECONDS.toMillis(60)));
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
+
+ Map<String, Object> consumerProps = new HashMap<>();
+ consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
+ consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
+ consumerProps.putAll(extraConsumerProps);
- producer = harness.createProducer(ser, ser, producerOverrideProps);
- consumer = harness.createConsumer(de, de, commonOverrideProps,
- CollectionConverters.asScala(List.<String>of()).toList());
- admin = harness.createAdminClient(listenerName, commonOverrideProps);
+ producer = cluster.producer(producerProps);
+ consumer = cluster.consumer(consumerProps);
+ admin = cluster.admin(Map.of());
}
private void initContext() {
- remoteStorageManagers =
TieredStorageTestHarness.remoteStorageManagers(harness.aliveBrokers());
- localStorages =
TieredStorageTestHarness.localStorages(harness.aliveBrokers());
+ remoteStorageManagers =
TieredStorageTestHarness.remoteStorageManagers(cluster.aliveBrokers().values());
+ localStorages =
TieredStorageTestHarness.localStorages(cluster.aliveBrokers().values());
}
public void createTopic(TopicSpec spec) throws ExecutionException,
InterruptedException {
@@ -124,7 +118,7 @@ public final class TieredStorageTestContext implements
AutoCloseable {
}
newTopic.configs(spec.properties());
admin.createTopics(List.of(newTopic)).all().get();
- TestUtils.waitForAllPartitionsMetadata(harness.brokers(),
spec.topicName(), spec.partitionCount());
+ cluster.waitTopicCreation(spec.topicName(), spec.partitionCount());
synchronized (this) {
topicSpecs.put(spec.topicName(), spec);
}
@@ -144,7 +138,7 @@ public final class TieredStorageTestContext implements
AutoCloseable {
}
Map<String, NewPartitions> partitionsMap = Map.of(spec.topicName(),
newPartitions);
admin.createPartitions(partitionsMap).all().get();
- TestUtils.waitForAllPartitionsMetadata(harness.brokers(),
spec.topicName(), spec.partitionCount());
+ cluster.waitTopicCreation(spec.topicName(), spec.partitionCount());
}
public void updateTopicConfig(String topic,
@@ -178,8 +172,8 @@ public final class TieredStorageTestContext implements
AutoCloseable {
admin.incrementalAlterConfigs(configsMap, alterOptions).all().get(30,
TimeUnit.SECONDS);
}
- public void deleteTopic(String topic) {
- TestUtils.deleteTopicWithAdmin(admin, topic, harness.brokers(),
harness.controllerServers());
+ public void deleteTopic(String topic) throws InterruptedException,
ExecutionException {
+ admin.deleteTopics(List.of(topic)).all().get();
}
/**
@@ -202,7 +196,7 @@ public final class TieredStorageTestContext implements
AutoCloseable {
public List<ConsumerRecord<String, String>> consume(TopicPartition
topicPartition,
Integer
expectedTotalCount,
- Long fetchOffset) {
+ Long fetchOffset)
throws InterruptedException {
consumer.assign(List.of(topicPartition));
consumer.seek(topicPartition, fetchOffset);
@@ -210,15 +204,16 @@ public final class TieredStorageTestContext implements
AutoCloseable {
long pollTimeoutMs = 100L;
String sep = System.lineSeparator();
List<ConsumerRecord<String, String>> records = new ArrayList<>();
- Function1<ConsumerRecords<String, String>, Object> pollAction =
polledRecords -> {
- polledRecords.forEach(records::add);
- return records.size() >= expectedTotalCount;
- };
- Function0<String> messageSupplier = () ->
- String.format("Could not consume %d records of %s from offset
%d in %d ms. %d message(s) consumed:%s%s",
- expectedTotalCount, topicPartition, fetchOffset,
timeoutMs, records.size(), sep,
-
records.stream().map(Object::toString).collect(Collectors.joining(sep)));
- TestUtils.pollRecordsUntilTrue(consumer, pollAction, messageSupplier,
timeoutMs, pollTimeoutMs);
+ TestUtils.waitForCondition(
+ () -> {
+
consumer.poll(Duration.ofMillis(pollTimeoutMs)).forEach(records::add);
+ return records.size() >= expectedTotalCount;
+ },
+ timeoutMs,
+ () -> String.format("Could not consume %d records of %s from
offset %d in %d ms. %d message(s) consumed:%s%s",
+ expectedTotalCount, topicPartition, fetchOffset,
timeoutMs, records.size(), sep,
+
records.stream().map(Object::toString).collect(Collectors.joining(sep)))
+ );
return records;
}
@@ -237,9 +232,9 @@ public final class TieredStorageTestContext implements
AutoCloseable {
}
public void bounce(int brokerId) {
- harness.killBroker(brokerId);
- boolean allBrokersDead = harness.aliveBrokers().isEmpty();
- harness.startBroker(brokerId);
+ cluster.shutdownBroker(brokerId);
+ boolean allBrokersDead = cluster.aliveBrokers().isEmpty();
+ cluster.startBroker(brokerId);
if (allBrokersDead) {
reinitClients();
}
@@ -247,13 +242,13 @@ public final class TieredStorageTestContext implements
AutoCloseable {
}
public void stop(int brokerId) {
- harness.killBroker(brokerId);
+ cluster.shutdownBroker(brokerId);
initContext();
}
public void start(int brokerId) {
- boolean allBrokersDead = harness.aliveBrokers().isEmpty();
- harness.startBroker(brokerId);
+ boolean allBrokersDead = cluster.aliveBrokers().isEmpty();
+ cluster.startBroker(brokerId);
if (allBrokersDead) {
reinitClients();
}
@@ -265,7 +260,7 @@ public final class TieredStorageTestContext implements
AutoCloseable {
boolean isStopped) throws IOException {
BrokerLocalStorage brokerLocalStorage;
if (isStopped) {
- brokerLocalStorage =
TieredStorageTestHarness.localStorages(harness.brokers())
+ brokerLocalStorage =
TieredStorageTestHarness.localStorages(cluster.brokers().values())
.stream()
.filter(bls -> bls.getBrokerId() == brokerId)
.findFirst()
@@ -283,7 +278,7 @@ public final class TieredStorageTestContext implements
AutoCloseable {
}
public LocalTieredStorageSnapshot takeTieredStorageSnapshot() {
- int aliveBrokerId = harness.aliveBrokers().head().config().brokerId();
+ int aliveBrokerId =
cluster.aliveBrokers().values().iterator().next().config().brokerId();
return
LocalTieredStorageSnapshot.takeSnapshot(remoteStorageManager(aliveBrokerId));
}
@@ -311,8 +306,8 @@ public final class TieredStorageTestContext implements
AutoCloseable {
return localStorages;
}
- public Deserializer<String> de() {
- return de;
+ public Deserializer<String> deserializer() {
+ return new StringDeserializer();
}
public Admin admin() {
@@ -320,7 +315,7 @@ public final class TieredStorageTestContext implements
AutoCloseable {
}
public boolean isActive(Integer brokerId) {
- return harness.aliveBrokers().exists(b -> b.config().brokerId() ==
brokerId);
+ return cluster.aliveBrokers().containsKey(brokerId);
}
public boolean isAssignedReplica(TopicPartition topicPartition, Integer
replicaId)
@@ -334,7 +329,7 @@ public final class TieredStorageTestContext implements
AutoCloseable {
}
public Optional<UnifiedLog> log(Integer brokerId, TopicPartition
partition) {
- return
harness.brokers().apply(brokerId).logManager().getLog(partition, false);
+ return cluster.brokers().get(brokerId).logManager().getLog(partition,
false);
}
public void succeed(TieredStorageTestAction action) {
@@ -351,7 +346,9 @@ public final class TieredStorageTestContext implements
AutoCloseable {
@Override
public void close() throws IOException {
- // IntegrationTestHarness closes the clients on tearDown, no need to
close them explicitly.
+ Utils.closeQuietly(producer, "Producer client");
+ Utils.closeQuietly(consumer, "Consumer client");
+ Utils.closeQuietly(admin, "Admin client");
}
private void reinitClients() {
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
index 40334a1726d..a25bffa5439 100644
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
@@ -38,8 +38,11 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Locale;
+import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
@@ -92,7 +95,7 @@ public abstract class TieredStorageTestHarness extends
IntegrationTestHarness {
protected abstract void writeTestSpecifications(TieredStorageTestBuilder
builder);
- protected void overrideConsumerConfig(Properties consumerConfig) {
+ protected void overrideConsumerConfig(Map<String, Object> consumerConfig) {
}
@BeforeEach
@@ -101,8 +104,9 @@ public abstract class TieredStorageTestHarness extends
IntegrationTestHarness {
testClassName =
testInfo.getTestClass().get().getSimpleName().toLowerCase(Locale.getDefault());
storageDirPath = TestUtils.tempDirectory("kafka-remote-tier-" +
testClassName).getAbsolutePath();
super.setUp(testInfo);
- overrideConsumerConfig(consumerConfig());
- context = new TieredStorageTestContext(this);
+ Map<String, Object> extraConsumerProps = new HashMap<>();
+ overrideConsumerConfig(extraConsumerProps);
+ context = new TieredStorageTestContext(new
HarnessBackedClusterInstance(this), extraConsumerProps);
}
// NOTE: Not able to refer
TestInfoUtils#TestWithParameterizedGroupProtocolNames() in the
ParameterizedTest name.
@@ -132,9 +136,9 @@ public abstract class TieredStorageTestHarness extends
IntegrationTestHarness {
}
}
- public static List<LocalTieredStorage>
remoteStorageManagers(Seq<KafkaBroker> brokers) {
+ public static List<LocalTieredStorage>
remoteStorageManagers(Collection<KafkaBroker> brokers) {
List<LocalTieredStorage> storages = new ArrayList<>();
- CollectionConverters.asJava(brokers).forEach(broker -> {
+ brokers.forEach(broker -> {
if (broker.remoteLogManagerOpt().isDefined()) {
RemoteLogManager remoteLogManager =
broker.remoteLogManagerOpt().get();
RemoteStorageManager storageManager =
remoteLogManager.storageManager();
@@ -153,8 +157,8 @@ public abstract class TieredStorageTestHarness extends
IntegrationTestHarness {
return storages;
}
- public static List<BrokerLocalStorage> localStorages(Seq<KafkaBroker>
brokers) {
- return CollectionConverters.asJava(brokers).stream()
+ public static List<BrokerLocalStorage>
localStorages(Collection<KafkaBroker> brokers) {
+ return brokers.stream()
.map(b -> new BrokerLocalStorage(b.config().brokerId(),
Set.copyOf(b.config().logDirs()),
STORAGE_WAIT_TIMEOUT_SEC))
.collect(Collectors.toList());
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestReport.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestReport.java
index 6f90d00401c..ec3a41ee1c2 100644
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestReport.java
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestReport.java
@@ -65,7 +65,7 @@ public final class TieredStorageTestReport {
String lts = "";
if (!context.remoteStorageManagers().isEmpty()) {
LocalTieredStorage tieredStorage =
context.remoteStorageManagers().get(0);
- lts = DumpLocalTieredStorage.dump(tieredStorage, context.de(),
context.de());
+ lts = DumpLocalTieredStorage.dump(tieredStorage,
context.deserializer(), context.deserializer());
}
output.printf("Content of local tiered storage:%n%n%s%n", lts);
}
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndTxnConsumeFromLeaderTest.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndTxnConsumeFromLeaderTest.java
index 9f3cd9e3637..3f2a9ce1581 100644
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndTxnConsumeFromLeaderTest.java
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndTxnConsumeFromLeaderTest.java
@@ -55,7 +55,7 @@ public final class OffloadAndTxnConsumeFromLeaderTest extends
TieredStorageTestH
}
@Override
- protected void overrideConsumerConfig(Properties consumerConfig) {
+ protected void overrideConsumerConfig(Map<String, Object> consumerConfig) {
consumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString());
}