This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 55a9d4069b3 KAFKA-17689 Change TieredStorageTestContext to use 
ClusterInstance (#22294)
55a9d4069b3 is described below

commit 55a9d4069b3ee6f10b5a8edfecaa1414e67c9a07
Author: PoAn Yang <[email protected]>
AuthorDate: Mon May 18 11:40:32 2026 +0900

    KAFKA-17689 Change TieredStorageTestContext to use ClusterInstance (#22294)
    
    Make TieredStorageTestContext support ClusterInstance.
    
    Reviewers: Ken Huang <[email protected]>, Murali Basani
     <[email protected]>, Chia-Ping Tsai   <[email protected]>
---
 build.gradle                                       |   1 +
 checkstyle/import-control-storage.xml              |   1 +
 .../storage/HarnessBackedClusterInstance.java      | 196 +++++++++++++++++++++
 .../tiered/storage/TieredStorageTestContext.java   | 115 ++++++------
 .../tiered/storage/TieredStorageTestHarness.java   |  18 +-
 .../tiered/storage/TieredStorageTestReport.java    |   2 +-
 .../OffloadAndTxnConsumeFromLeaderTest.java        |   2 +-
 7 files changed, 267 insertions(+), 68 deletions(-)

diff --git a/build.gradle b/build.gradle
index 9cee79e0610..715fe2f4163 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2467,6 +2467,7 @@ project(':storage') {
     testImplementation project(':core')
     testImplementation project(':core').sourceSets.test.output
     testImplementation testFixtures(project(':storage:storage-api'))
+    testImplementation project(':metadata')
     testImplementation project(':test-common:test-common-internal-api')
     testImplementation project(':test-common:test-common-runtime')
     testImplementation project(':test-common:test-common-util')
diff --git a/checkstyle/import-control-storage.xml 
b/checkstyle/import-control-storage.xml
index 040e3761e37..3599ca5fffe 100644
--- a/checkstyle/import-control-storage.xml
+++ b/checkstyle/import-control-storage.xml
@@ -118,6 +118,7 @@
         <allow pkg="org.apache.kafka.tiered.storage" />
 
         <allow pkg="kafka.api" />
+        <allow pkg="kafka.integration" />
         <allow pkg="kafka.log" />
         <allow pkg="kafka.server" />
         <allow pkg="kafka.utils" />
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/HarnessBackedClusterInstance.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/HarnessBackedClusterInstance.java
new file mode 100644
index 00000000000..45726f9dfc7
--- /dev/null
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/HarnessBackedClusterInstance.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage;
+
+import kafka.integration.KafkaServerTestHarness;
+import kafka.server.ControllerServer;
+import kafka.server.KafkaBroker;
+
+import org.apache.kafka.common.network.ConnectionMode;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfig;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.server.fault.FaultHandlerException;
+import org.apache.kafka.test.TestSslUtils;
+import org.apache.kafka.test.TestUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import scala.jdk.javaapi.CollectionConverters;
+
+/**
+ * A {@link ClusterInstance} implementation backed by a {@link 
KafkaServerTestHarness}.
+ * This allows {@link TieredStorageTestContext} to depend only on {@link 
ClusterInstance}
+ * rather than on the concrete harness class.
+ */
+public class HarnessBackedClusterInstance implements ClusterInstance {
+
+    private final KafkaServerTestHarness harness;
+
+    public HarnessBackedClusterInstance(KafkaServerTestHarness harness) {
+        this.harness = harness;
+    }
+
+    @Override
+    public Type type() {
+        return Type.KRAFT;
+    }
+
+    @Override
+    public ClusterConfig config() {
+        return ClusterConfig.defaultBuilder()
+                .setBrokers(harness.brokers().size())
+                .setControllers(harness.controllerServers().size())
+                .setBrokerSecurityProtocol(harness.securityProtocol())
+                .setBrokerListenerName(harness.listenerName())
+                .setControllerListenerName(controllerListenerName())
+                .setMetadataVersion(harness.metadataVersion())
+                .build();
+    }
+
+    @Override
+    public Set<Integer> controllerIds() {
+        return controllers().keySet();
+    }
+
+    @Override
+    public ListenerName clientListener() {
+        return harness.listenerName();
+    }
+
+    @Override
+    public ListenerName controllerListenerName() {
+        return controllers().values().stream()
+                .map(c -> new 
ListenerName(c.config().controllerListenerNames().get(0)))
+                .findFirst()
+                .orElseThrow(() -> new RuntimeException("No controllers 
available"));
+    }
+
+    @Override
+    public String bootstrapServers() {
+        return harness.bootstrapServers(harness.listenerName());
+    }
+
+    @Override
+    public String bootstrapControllers() {
+        throw new UnsupportedOperationException("bootstrapControllers() is not 
supported in HarnessBackedClusterInstance");
+    }
+
+    @Override
+    public String clusterId() {
+        return brokers().values().stream()
+                .map(KafkaBroker::clusterId)
+                .findFirst()
+                .orElseThrow(() -> new RuntimeException("No brokers 
available"));
+    }
+
+    @Override
+    public Map<Integer, KafkaBroker> brokers() {
+        return CollectionConverters.asJava(harness.brokers()).stream()
+                .collect(Collectors.toMap(b -> b.config().brokerId(), b -> b));
+    }
+
+    @Override
+    public Map<Integer, ControllerServer> controllers() {
+        return 
CollectionConverters.asJava(harness.controllerServers()).stream()
+                .collect(Collectors.toMap(c -> c.config().nodeId(), c -> c));
+    }
+
+    @Override
+    public void shutdownBroker(int brokerId) {
+        harness.killBroker(brokerId);
+    }
+
+    @Override
+    public void startBroker(int brokerId) {
+        harness.startBroker(brokerId);
+    }
+
+    @Override
+    public void start() {
+        throw new UnsupportedOperationException("start() is managed by 
KafkaServerTestHarness");
+    }
+
+    @Override
+    public boolean started() {
+        return true;
+    }
+
+    @Override
+    public void stop() {
+        throw new UnsupportedOperationException("stop() is managed by 
KafkaServerTestHarness");
+    }
+
+    @Override
+    public boolean stopped() {
+        return false;
+    }
+
+    @Override
+    public void waitForReadyBrokers() throws InterruptedException {
+        Map<Integer, KafkaBroker> brokerMap = brokers();
+
+        // Step 1: wait until a controller marks all brokers as registered and 
unfenced
+        ControllerServer controllerServer = 
controllers().values().iterator().next();
+        try {
+            
controllerServer.controller().waitForReadyBrokers(brokerMap.size()).get();
+        } catch (ExecutionException e) {
+            throw new AssertionError("Failed while waiting for brokers to 
become ready", e);
+        }
+
+        // Step 2: wait until each broker's metadata cache knows about all 
alive brokers
+        Set<Integer> brokerIds = brokerMap.keySet();
+        TestUtils.waitForCondition(
+            () -> brokerMap.values().stream().allMatch(
+                broker -> brokerIds.stream().allMatch(id -> 
broker.metadataCache().hasAliveBroker(id))
+            ),
+            "Timed out waiting for metadata cache to reflect all alive brokers"
+        );
+    }
+
+    @Override
+    public Optional<FaultHandlerException> firstFatalException() {
+        return Optional.ofNullable(harness.faultHandler().firstException());
+    }
+
+    @Override
+    public Optional<FaultHandlerException> firstNonFatalException() {
+        return Optional.empty();
+    }
+
+    @Override
+    public Map<String, Object> setClientSslConfig(Map<String, Object> configs) 
{
+        if (harness.trustStoreFile().isEmpty()) {
+            return configs;
+        }
+        try {
+            Map<String, Object> props = new HashMap<>(configs);
+            props.putAll(new 
TestSslUtils.SslConfigsBuilder(ConnectionMode.CLIENT)
+                    .useExistingTrustStore(harness.trustStoreFile().get())
+                    .build());
+            return props;
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to build client SSL config", e);
+        }
+    }
+}
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
index 9a7e94b2058..428cc5f7662 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
@@ -16,9 +16,6 @@
  */
 package org.apache.kafka.tiered.storage;
 
-import kafka.utils.TestUtils;
-
-import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AlterConfigOp;
 import org.apache.kafka.clients.admin.AlterConfigsOptions;
@@ -27,24 +24,25 @@ import org.apache.kafka.clients.admin.NewPartitions;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.TopicPartitionInfo;
 import org.apache.kafka.common.config.ConfigResource;
-import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.test.ClusterInstance;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
 import org.apache.kafka.server.log.remote.storage.LocalTieredStorageHistory;
 import org.apache.kafka.server.log.remote.storage.LocalTieredStorageSnapshot;
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
 import org.apache.kafka.storage.internals.log.UnifiedLog;
+import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec;
 import org.apache.kafka.tiered.storage.specs.TopicSpec;
 import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
@@ -52,66 +50,62 @@ import 
org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.PrintStream;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
-import scala.Function0;
-import scala.Function1;
-import scala.jdk.javaapi.CollectionConverters;
-
 import static 
org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
 
 public final class TieredStorageTestContext implements AutoCloseable {
 
-    private final TieredStorageTestHarness harness;
-    private final Serializer<String> ser = new StringSerializer();
-    private final Deserializer<String> de = new StringDeserializer();
+    private final ClusterInstance cluster;
+    private final Map<String, Object> extraConsumerProps;
     private final Map<String, TopicSpec> topicSpecs = new HashMap<>();
     private final TieredStorageTestReport testReport;
 
-    private volatile KafkaProducer<String, String> producer;
+    private volatile Producer<String, String> producer;
     private volatile Consumer<String, String> consumer;
     private volatile Admin admin;
     private volatile List<LocalTieredStorage> remoteStorageManagers;
     private volatile List<BrokerLocalStorage> localStorages;
 
-    public TieredStorageTestContext(TieredStorageTestHarness harness) {
-        this.harness = harness;
+    public TieredStorageTestContext(ClusterInstance cluster, Map<String, 
Object> extraConsumerProps) {
+        this.cluster = cluster;
+        this.extraConsumerProps = extraConsumerProps;
         this.testReport = new TieredStorageTestReport(this);
         initClients();
         initContext();
     }
 
     private void initClients() {
-        // rediscover the new bootstrap-server port in case of broker restarts
-        ListenerName listenerName = harness.listenerName();
-        Properties commonOverrideProps = new Properties();
-        commonOverrideProps.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
harness.bootstrapServers(listenerName));
-
         // Set a producer linger of 60 seconds, in order to optimistically 
generate batches of
         // records with a pre-determined size.
-        Properties producerOverrideProps = new Properties();
-        producerOverrideProps.put(LINGER_MS_CONFIG, 
String.valueOf(TimeUnit.SECONDS.toMillis(60)));
-        producerOverrideProps.putAll(commonOverrideProps);
+        Map<String, Object> producerProps = new HashMap<>();
+        producerProps.put(LINGER_MS_CONFIG, 
String.valueOf(TimeUnit.SECONDS.toMillis(60)));
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+
+        Map<String, Object> consumerProps = new HashMap<>();
+        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
+        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
+        consumerProps.putAll(extraConsumerProps);
 
-        producer = harness.createProducer(ser, ser, producerOverrideProps);
-        consumer = harness.createConsumer(de, de, commonOverrideProps,
-                CollectionConverters.asScala(List.<String>of()).toList());
-        admin = harness.createAdminClient(listenerName, commonOverrideProps);
+        producer = cluster.producer(producerProps);
+        consumer = cluster.consumer(consumerProps);
+        admin = cluster.admin(Map.of());
     }
 
     private void initContext() {
-        remoteStorageManagers = 
TieredStorageTestHarness.remoteStorageManagers(harness.aliveBrokers());
-        localStorages = 
TieredStorageTestHarness.localStorages(harness.aliveBrokers());
+        remoteStorageManagers = 
TieredStorageTestHarness.remoteStorageManagers(cluster.aliveBrokers().values());
+        localStorages = 
TieredStorageTestHarness.localStorages(cluster.aliveBrokers().values());
     }
 
     public void createTopic(TopicSpec spec) throws ExecutionException, 
InterruptedException {
@@ -124,7 +118,7 @@ public final class TieredStorageTestContext implements 
AutoCloseable {
         }
         newTopic.configs(spec.properties());
         admin.createTopics(List.of(newTopic)).all().get();
-        TestUtils.waitForAllPartitionsMetadata(harness.brokers(), 
spec.topicName(), spec.partitionCount());
+        cluster.waitTopicCreation(spec.topicName(), spec.partitionCount());
         synchronized (this) {
             topicSpecs.put(spec.topicName(), spec);
         }
@@ -144,7 +138,7 @@ public final class TieredStorageTestContext implements 
AutoCloseable {
         }
         Map<String, NewPartitions> partitionsMap = Map.of(spec.topicName(), 
newPartitions);
         admin.createPartitions(partitionsMap).all().get();
-        TestUtils.waitForAllPartitionsMetadata(harness.brokers(), 
spec.topicName(), spec.partitionCount());
+        cluster.waitTopicCreation(spec.topicName(), spec.partitionCount());
     }
 
     public void updateTopicConfig(String topic,
@@ -178,8 +172,8 @@ public final class TieredStorageTestContext implements 
AutoCloseable {
         admin.incrementalAlterConfigs(configsMap, alterOptions).all().get(30, 
TimeUnit.SECONDS);
     }
 
-    public void deleteTopic(String topic) {
-        TestUtils.deleteTopicWithAdmin(admin, topic, harness.brokers(), 
harness.controllerServers());
+    public void deleteTopic(String topic) throws InterruptedException, 
ExecutionException {
+        admin.deleteTopics(List.of(topic)).all().get();
     }
 
     /**
@@ -202,7 +196,7 @@ public final class TieredStorageTestContext implements 
AutoCloseable {
 
     public List<ConsumerRecord<String, String>> consume(TopicPartition 
topicPartition,
                                                         Integer 
expectedTotalCount,
-                                                        Long fetchOffset) {
+                                                        Long fetchOffset) 
throws InterruptedException {
         consumer.assign(List.of(topicPartition));
         consumer.seek(topicPartition, fetchOffset);
 
@@ -210,15 +204,16 @@ public final class TieredStorageTestContext implements 
AutoCloseable {
         long pollTimeoutMs = 100L;
         String sep = System.lineSeparator();
         List<ConsumerRecord<String, String>> records = new ArrayList<>();
-        Function1<ConsumerRecords<String, String>, Object> pollAction = 
polledRecords -> {
-            polledRecords.forEach(records::add);
-            return records.size() >= expectedTotalCount;
-        };
-        Function0<String> messageSupplier = () ->
-                String.format("Could not consume %d records of %s from offset 
%d in %d ms. %d message(s) consumed:%s%s",
-                        expectedTotalCount, topicPartition, fetchOffset, 
timeoutMs, records.size(), sep,
-                        
records.stream().map(Object::toString).collect(Collectors.joining(sep)));
-        TestUtils.pollRecordsUntilTrue(consumer, pollAction, messageSupplier, 
timeoutMs, pollTimeoutMs);
+        TestUtils.waitForCondition(
+            () -> {
+                
consumer.poll(Duration.ofMillis(pollTimeoutMs)).forEach(records::add);
+                return records.size() >= expectedTotalCount;
+            },
+            timeoutMs,
+            () -> String.format("Could not consume %d records of %s from 
offset %d in %d ms. %d message(s) consumed:%s%s",
+                    expectedTotalCount, topicPartition, fetchOffset, 
timeoutMs, records.size(), sep,
+                    
records.stream().map(Object::toString).collect(Collectors.joining(sep)))
+        );
         return records;
     }
 
@@ -237,9 +232,9 @@ public final class TieredStorageTestContext implements 
AutoCloseable {
     }
 
     public void bounce(int brokerId) {
-        harness.killBroker(brokerId);
-        boolean allBrokersDead = harness.aliveBrokers().isEmpty();
-        harness.startBroker(brokerId);
+        cluster.shutdownBroker(brokerId);
+        boolean allBrokersDead = cluster.aliveBrokers().isEmpty();
+        cluster.startBroker(brokerId);
         if (allBrokersDead) {
             reinitClients();
         }
@@ -247,13 +242,13 @@ public final class TieredStorageTestContext implements 
AutoCloseable {
     }
 
     public void stop(int brokerId) {
-        harness.killBroker(brokerId);
+        cluster.shutdownBroker(brokerId);
         initContext();
     }
 
     public void start(int brokerId) {
-        boolean allBrokersDead = harness.aliveBrokers().isEmpty();
-        harness.startBroker(brokerId);
+        boolean allBrokersDead = cluster.aliveBrokers().isEmpty();
+        cluster.startBroker(brokerId);
         if (allBrokersDead) {
             reinitClients();
         }
@@ -265,7 +260,7 @@ public final class TieredStorageTestContext implements 
AutoCloseable {
                                    boolean isStopped) throws IOException {
         BrokerLocalStorage brokerLocalStorage;
         if (isStopped) {
-            brokerLocalStorage = 
TieredStorageTestHarness.localStorages(harness.brokers())
+            brokerLocalStorage = 
TieredStorageTestHarness.localStorages(cluster.brokers().values())
                     .stream()
                     .filter(bls -> bls.getBrokerId() == brokerId)
                     .findFirst()
@@ -283,7 +278,7 @@ public final class TieredStorageTestContext implements 
AutoCloseable {
     }
 
     public LocalTieredStorageSnapshot takeTieredStorageSnapshot() {
-        int aliveBrokerId = harness.aliveBrokers().head().config().brokerId();
+        int aliveBrokerId = 
cluster.aliveBrokers().values().iterator().next().config().brokerId();
         return 
LocalTieredStorageSnapshot.takeSnapshot(remoteStorageManager(aliveBrokerId));
     }
 
@@ -311,8 +306,8 @@ public final class TieredStorageTestContext implements 
AutoCloseable {
         return localStorages;
     }
 
-    public Deserializer<String> de() {
-        return de;
+    public Deserializer<String> deserializer() {
+        return new StringDeserializer();
     }
 
     public Admin admin() {
@@ -320,7 +315,7 @@ public final class TieredStorageTestContext implements 
AutoCloseable {
     }
 
     public boolean isActive(Integer brokerId) {
-        return harness.aliveBrokers().exists(b -> b.config().brokerId() == 
brokerId);
+        return cluster.aliveBrokers().containsKey(brokerId);
     }
 
     public boolean isAssignedReplica(TopicPartition topicPartition, Integer 
replicaId)
@@ -334,7 +329,7 @@ public final class TieredStorageTestContext implements 
AutoCloseable {
     }
 
     public Optional<UnifiedLog> log(Integer brokerId, TopicPartition 
partition) {
-        return 
harness.brokers().apply(brokerId).logManager().getLog(partition, false);
+        return cluster.brokers().get(brokerId).logManager().getLog(partition, 
false);
     }
 
     public void succeed(TieredStorageTestAction action) {
@@ -351,7 +346,9 @@ public final class TieredStorageTestContext implements 
AutoCloseable {
 
     @Override
     public void close() throws IOException {
-        // IntegrationTestHarness closes the clients on tearDown, no need to 
close them explicitly.
+        Utils.closeQuietly(producer, "Producer client");
+        Utils.closeQuietly(consumer, "Consumer client");
+        Utils.closeQuietly(admin, "Admin client");
     }
 
     private void reinitClients() {
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
index 40334a1726d..a25bffa5439 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
@@ -38,8 +38,11 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
@@ -92,7 +95,7 @@ public abstract class TieredStorageTestHarness extends 
IntegrationTestHarness {
 
     protected abstract void writeTestSpecifications(TieredStorageTestBuilder 
builder);
 
-    protected void overrideConsumerConfig(Properties consumerConfig) {
+    protected void overrideConsumerConfig(Map<String, Object> consumerConfig) {
     }
 
     @BeforeEach
@@ -101,8 +104,9 @@ public abstract class TieredStorageTestHarness extends 
IntegrationTestHarness {
         testClassName = 
testInfo.getTestClass().get().getSimpleName().toLowerCase(Locale.getDefault());
         storageDirPath = TestUtils.tempDirectory("kafka-remote-tier-" + 
testClassName).getAbsolutePath();
         super.setUp(testInfo);
-        overrideConsumerConfig(consumerConfig());
-        context = new TieredStorageTestContext(this);
+        Map<String, Object> extraConsumerProps = new HashMap<>();
+        overrideConsumerConfig(extraConsumerProps);
+        context = new TieredStorageTestContext(new 
HarnessBackedClusterInstance(this), extraConsumerProps);
     }
 
     // NOTE: Not able to refer 
TestInfoUtils#TestWithParameterizedGroupProtocolNames() in the 
ParameterizedTest name.
@@ -132,9 +136,9 @@ public abstract class TieredStorageTestHarness extends 
IntegrationTestHarness {
         }
     }
 
-    public static List<LocalTieredStorage> 
remoteStorageManagers(Seq<KafkaBroker> brokers) {
+    public static List<LocalTieredStorage> 
remoteStorageManagers(Collection<KafkaBroker> brokers) {
         List<LocalTieredStorage> storages = new ArrayList<>();
-        CollectionConverters.asJava(brokers).forEach(broker -> {
+        brokers.forEach(broker -> {
             if (broker.remoteLogManagerOpt().isDefined()) {
                 RemoteLogManager remoteLogManager = 
broker.remoteLogManagerOpt().get();
                 RemoteStorageManager storageManager = 
remoteLogManager.storageManager();
@@ -153,8 +157,8 @@ public abstract class TieredStorageTestHarness extends 
IntegrationTestHarness {
         return storages;
     }
 
-    public static List<BrokerLocalStorage> localStorages(Seq<KafkaBroker> 
brokers) {
-        return CollectionConverters.asJava(brokers).stream()
+    public static List<BrokerLocalStorage> 
localStorages(Collection<KafkaBroker> brokers) {
+        return brokers.stream()
                 .map(b -> new BrokerLocalStorage(b.config().brokerId(), 
Set.copyOf(b.config().logDirs()),
                         STORAGE_WAIT_TIMEOUT_SEC))
                 .collect(Collectors.toList());
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestReport.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestReport.java
index 6f90d00401c..ec3a41ee1c2 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestReport.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestReport.java
@@ -65,7 +65,7 @@ public final class TieredStorageTestReport {
         String lts = "";
         if (!context.remoteStorageManagers().isEmpty()) {
             LocalTieredStorage tieredStorage = 
context.remoteStorageManagers().get(0);
-            lts = DumpLocalTieredStorage.dump(tieredStorage, context.de(), 
context.de());
+            lts = DumpLocalTieredStorage.dump(tieredStorage, 
context.deserializer(), context.deserializer());
         }
         output.printf("Content of local tiered storage:%n%n%s%n", lts);
     }
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndTxnConsumeFromLeaderTest.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndTxnConsumeFromLeaderTest.java
index 9f3cd9e3637..3f2a9ce1581 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndTxnConsumeFromLeaderTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndTxnConsumeFromLeaderTest.java
@@ -55,7 +55,7 @@ public final class OffloadAndTxnConsumeFromLeaderTest extends 
TieredStorageTestH
     }
 
     @Override
-    protected void overrideConsumerConfig(Properties consumerConfig) {
+    protected void overrideConsumerConfig(Map<String, Object> consumerConfig) {
         consumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
IsolationLevel.READ_COMMITTED.toString());
     }
 

Reply via email to