This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1854d4b8a11 KAFKA-14572: Migrate EmbeddedKafkaCluster used by Streams 
integration tests from EmbeddedZookeeper to KRaft (#17016)
1854d4b8a11 is described below

commit 1854d4b8a11461b53b59fa109b95f2a4f5003997
Author: Omnia Ibrahim <[email protected]>
AuthorDate: Fri Sep 27 20:49:12 2024 +0100

    KAFKA-14572: Migrate EmbeddedKafkaCluster used by Streams integration tests 
from EmbeddedZookeeper to KRaft (#17016)
    
    Migrate the EmbeddedKafkaCluster from the EmbeddedZookeeper to KRaft
    
    Reviewers Bill Bejeck <[email protected]>
---
 build.gradle                                       |   1 +
 checkstyle/import-control.xml                      |   2 +
 .../integration/AbstractResetIntegrationTest.java  |  13 +-
 .../integration/AdjustStreamThreadCountTest.java   |   2 +-
 .../streams/integration/EosIntegrationTest.java    |   3 +-
 ...ighAvailabilityTaskAssignorIntegrationTest.java |  21 +-
 .../integration/IQv2StoreIntegrationTest.java      |   2 +-
 .../JoinGracePeriodDurabilityIntegrationTest.java  |  11 +-
 .../integration/JoinStoreIntegrationTest.java      |   2 +-
 .../JoinWithIncompleteMetadataIntegrationTest.java |   2 +-
 .../KTableSourceTopicRestartIntegrationTest.java   |  16 +-
 .../KafkaStreamsCloseOptionsIntegrationTest.java   |   2 +-
 .../integration/NamedTopologyIntegrationTest.java  |  28 +-
 .../PositionRestartIntegrationTest.java            |   4 +-
 .../integration/QueryableStateIntegrationTest.java |   2 +-
 .../integration/RangeQueryIntegrationTest.java     |   2 +-
 .../integration/RegexSourceIntegrationTest.java    |   6 +-
 .../streams/integration/ResetIntegrationTest.java  |  28 +-
 .../integration/ResetIntegrationWithSslTest.java   |   6 +-
 .../ResetPartitionTimeIntegrationTest.java         |  15 +-
 .../integration/RestoreIntegrationTest.java        |   2 +-
 .../integration/RocksDBMetricsIntegrationTest.java |   2 +-
 .../integration/StandbyTaskEOSIntegrationTest.java |   2 +-
 ...tandbyTaskEOSMultiRebalanceIntegrationTest.java |   6 +-
 .../integration/StateDirectoryIntegrationTest.java |   4 +-
 ...amsUncaughtExceptionHandlerIntegrationTest.java |  18 +-
 .../SuppressionDurabilityIntegrationTest.java      |  37 +-
 .../integration/SuppressionIntegrationTest.java    |  11 +-
 .../integration/TaskMetadataIntegrationTest.java   |   2 +-
 .../integration/utils/EmbeddedKafkaCluster.java    | 473 ++++++++++++---------
 .../integration/utils/IntegrationTestUtils.java    |  36 +-
 .../streams/integration/utils/KafkaEmbedded.java   | 223 ----------
 ...HandlingSourceTopicDeletionIntegrationTest.java |   2 +-
 33 files changed, 395 insertions(+), 591 deletions(-)

diff --git a/build.gradle b/build.gradle
index 0907f0506a4..0cd97d23186 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2528,6 +2528,7 @@ project(':streams') {
     // testCompileOnly prevents streams from exporting a dependency on 
test-utils, which would cause a dependency cycle
     testCompileOnly project(':streams:test-utils')
 
+    testImplementation project(':metadata')
     testImplementation project(':clients').sourceSets.test.output
     testImplementation project(':server')
     testImplementation project(':core')
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index f7d7ef798bd..487d49f0466 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -393,6 +393,8 @@
     </subpackage>
 
     <subpackage name="integration">
+      <allow pkg="kafka.testkit"/>
+      <allow pkg="org.apache.kafka.metadata"/>
       <allow pkg="kafka.admin" />
       <allow pkg="kafka.api" />
       <allow pkg="kafka.cluster" />
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
index b92e2ad135a..293c5f5d286 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
@@ -150,7 +150,7 @@ public abstract class AbstractResetIntegrationTest {
 
     protected static final int STREAMS_CONSUMER_TIMEOUT = 2000;
     protected static final int CLEANUP_CONSUMER_TIMEOUT = 2000;
-    protected static final int TIMEOUT_MULTIPLIER = 15;
+    protected static final int TIMEOUT_MULTIPLIER = 30;
 
     void prepareTest(final TestInfo testInfo) throws Exception {
         final String appID = IntegrationTestUtils.safeUniqueTestName(testInfo);
@@ -159,7 +159,7 @@ public abstract class AbstractResetIntegrationTest {
 
         waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * 
CLEANUP_CONSUMER_TIMEOUT);
 
-        cluster.deleteAllTopicsAndWait(120000);
+        cluster.deleteAllTopics();
         cluster.createTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, 
OUTPUT_TOPIC_2_RERUN);
 
         add10InputElements();
@@ -199,7 +199,7 @@ public abstract class AbstractResetIntegrationTest {
 
         // RUN
         streams = new KafkaStreams(setupTopologyWithIntermediateTopic(true, 
OUTPUT_TOPIC_2), streamsConfig);
-        streams.start();
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
         
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, 
OUTPUT_TOPIC, 10);
 
         streams.close();
@@ -272,7 +272,7 @@ public abstract class AbstractResetIntegrationTest {
 
         // RUN
         streams = new 
KafkaStreams(setupTopologyWithIntermediateTopic(useRepartitioned, 
OUTPUT_TOPIC_2), streamsConfig);
-        streams.start();
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
         final List<KeyValue<Long, Long>> result = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, 
OUTPUT_TOPIC, 10);
         // receive only first values to make sure intermediate user topic is 
not consumed completely
         // => required to test "seekToEnd" for intermediate topics
@@ -301,7 +301,7 @@ public abstract class AbstractResetIntegrationTest {
         assertInternalTopicsGotDeleted(useRepartitioned ? null : 
INTERMEDIATE_USER_TOPIC);
 
         // RE-RUN
-        streams.start();
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
         final List<KeyValue<Long, Long>> resultRerun = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, 
OUTPUT_TOPIC, 10);
         final List<KeyValue<Long, Long>> resultRerun2 = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, 
OUTPUT_TOPIC_2_RERUN, 40);
         streams.close();
@@ -323,7 +323,7 @@ public abstract class AbstractResetIntegrationTest {
         cleanGlobal(!useRepartitioned, null, null, appID);
 
         if (!useRepartitioned) {
-            cluster.deleteTopicAndWait(INTERMEDIATE_USER_TOPIC);
+            cluster.deleteTopic(INTERMEDIATE_USER_TOPIC);
         }
     }
 
@@ -420,7 +420,6 @@ public abstract class AbstractResetIntegrationTest {
     }
 
     protected void assertInternalTopicsGotDeleted(final String 
additionalExistingTopic) throws Exception {
-        // do not use list topics request, but read from the embedded 
cluster's zookeeper path directly to confirm
         if (additionalExistingTopic != null) {
             cluster.waitForRemainingTopics(30000, INPUT_TOPIC, OUTPUT_TOPIC, 
OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN,
                     Topic.GROUP_METADATA_TOPIC_NAME, additionalExistingTopic);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
index 4feaa2c6283..916940ce9bf 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
@@ -107,7 +107,7 @@ public class AdjustStreamThreadCountTest {
         builder = new StreamsBuilder();
         builder.stream(inputTopic);
 
-        properties  = mkObjectProperties(
+        properties = mkObjectProperties(
             mkMap(
                 mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers()),
                 mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index b5651e889ef..2b924e03727 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -162,8 +162,7 @@ public class EosIntegrationTest {
     @BeforeEach
     public void createTopics() throws Exception {
         applicationId = "appId-" + TEST_NUMBER.getAndIncrement();
-        CLUSTER.deleteTopicsAndWait(
-            60_000L,
+        CLUSTER.deleteTopics(
             SINGLE_PARTITION_INPUT_TOPIC, MULTI_PARTITION_INPUT_TOPIC,
             SINGLE_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_THROUGH_TOPIC,
             SINGLE_PARTITION_OUTPUT_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
index 52a3e839c21..b50cf4ad62c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
@@ -67,7 +67,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Function;
 
-import static java.util.Arrays.asList;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
@@ -81,19 +80,11 @@ import static org.hamcrest.Matchers.is;
 @Tag("integration")
 public class HighAvailabilityTaskAssignorIntegrationTest {
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(3,
-        new Properties(),
-        asList(
-            new Properties() {{
-                    setProperty(ServerConfigs.BROKER_RACK_CONFIG, 
AssignmentTestUtils.RACK_0);
-                }},
-            new Properties() {{
-                    setProperty(ServerConfigs.BROKER_RACK_CONFIG, 
AssignmentTestUtils.RACK_1);
-                }},
-            new Properties() {{
-                    setProperty(ServerConfigs.BROKER_RACK_CONFIG, 
AssignmentTestUtils.RACK_2);
-                }}
-        )
-    );
+        new Properties(), mkMap(
+            mkEntry(0, mkMap(mkEntry(ServerConfigs.BROKER_RACK_CONFIG, 
AssignmentTestUtils.RACK_0))),
+            mkEntry(1, mkMap(mkEntry(ServerConfigs.BROKER_RACK_CONFIG, 
AssignmentTestUtils.RACK_1))),
+            mkEntry(2, mkMap(mkEntry(ServerConfigs.BROKER_RACK_CONFIG, 
AssignmentTestUtils.RACK_2)))
+    ));
 
     @BeforeAll
     public static void startCluster() throws IOException {
@@ -258,7 +249,7 @@ public class HighAvailabilityTaskAssignorIntegrationTest {
 
             restoreCompleteLatch.await();
             // We should finalize the restoration without having restored any 
records (because they're already in
-            // the store. Otherwise, we failed to properly re-use the state 
from the standby.
+            // the store). Otherwise, we failed to properly re-use the state 
from the standby.
             assertThat(instance1TotalRestored.get(), is(0L));
             // Belt-and-suspenders check that we never even attempt to restore 
any records.
             assertThat(instance1NumRestored.get(), is(-1L));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
index 425f1eb207f..be31768aa4a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
@@ -379,7 +379,7 @@ public class IQv2StoreIntegrationTest {
         throws InterruptedException, IOException, ExecutionException, 
TimeoutException {
 
         CLUSTER.start();
-        CLUSTER.deleteAllTopicsAndWait(60 * 1000L);
+        CLUSTER.deleteAllTopics();
         final int partitions = 2;
         CLUSTER.createTopic(INPUT_TOPIC_NAME, partitions, 1);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java
index 1ac11194f5f..61cbca6e3d2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java
@@ -49,6 +49,7 @@ import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.time.Duration;
+import java.time.Instant;
 import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
@@ -70,12 +71,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
 @Timeout(600)
 public class JoinGracePeriodDurabilityIntegrationTest {
 
-    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(
-        3,
-        mkProperties(mkMap()),
-        0L
-    );
-
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(3);
+    private static final long NOW = Instant.now().toEpochMilli();
     @BeforeAll
     public static void startCluster() throws IOException {
         CLUSTER.start();
@@ -218,7 +215,7 @@ public class JoinGracePeriodDurabilityIntegrationTest {
      * just to exercise that everything works properly in the presence of 
commits.
      */
     private long scaledTime(final long unscaledTime) {
-        return COMMIT_INTERVAL * 2 * unscaledTime;
+        return NOW + COMMIT_INTERVAL * 2 * unscaledTime;
     }
 
     private static void produceSynchronouslyToPartitionZero(final String 
topic, final List<KeyValueTimestamp<String, String>> toProduce) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
index c71ac31672a..1041f323ae1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
@@ -97,7 +97,7 @@ public class JoinStoreIntegrationTest {
 
     @AfterEach
     public void cleanup() throws InterruptedException, IOException {
-        CLUSTER.deleteAllTopicsAndWait(120000);
+        CLUSTER.deleteAllTopics();
         IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java
index 58b00f81d53..c3059011cbf 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java
@@ -84,7 +84,7 @@ public class JoinWithIncompleteMetadataIntegrationTest {
 
     @AfterEach
     public void cleanup() throws InterruptedException, IOException {
-        CLUSTER.deleteAllTopicsAndWait(120000);
+        CLUSTER.deleteAllTopics();
         IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
index 35677aeaef2..5f742d95b3c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
@@ -120,8 +120,7 @@ public class KTableSourceTopicRestartIntegrationTest {
     @Test
     public void 
shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosDisabled() 
throws Exception {
         try {
-            streams = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
-            streams.start();
+            streams = IntegrationTestUtils.getRunningStreams(STREAMS_CONFIG, 
streamsBuilder, false);
 
             produceKeyValues("a", "b", "c");
 
@@ -131,7 +130,7 @@ public class KTableSourceTopicRestartIntegrationTest {
             streams = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
             // the state restore listener will append one record to the log
             streams.setGlobalStateRestoreListener(new 
UpdatingSourceTopicOnRestoreStartStateRestoreListener());
-            streams.start();
+            IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
 
             produceKeyValues("f", "g", "h");
 
@@ -149,8 +148,7 @@ public class KTableSourceTopicRestartIntegrationTest {
         STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfig.EXACTLY_ONCE_V2);
 
         try {
-            streams = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
-            streams.start();
+            streams = IntegrationTestUtils.getRunningStreams(STREAMS_CONFIG, 
streamsBuilder, false);
 
             produceKeyValues("a", "b", "c");
 
@@ -160,7 +158,7 @@ public class KTableSourceTopicRestartIntegrationTest {
             streams = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
             // the state restore listener will append one record to the log
             streams.setGlobalStateRestoreListener(new 
UpdatingSourceTopicOnRestoreStartStateRestoreListener());
-            streams.start();
+            IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
 
             produceKeyValues("f", "g", "h");
 
@@ -176,16 +174,14 @@ public class KTableSourceTopicRestartIntegrationTest {
     @Test
     public void 
shouldRestoreAndProgressWhenTopicNotWrittenToDuringRestoration() throws 
Exception {
         try {
-            streams = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
-            streams.start();
+            streams = IntegrationTestUtils.getStartedStreams(STREAMS_CONFIG, 
streamsBuilder, false);
 
             produceKeyValues("a", "b", "c");
 
             assertNumberValuesRead(readKeyValues, expectedInitialResultsMap, 
"Table did not read all values");
 
             streams.close();
-            streams = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
-            streams.start();
+            streams = IntegrationTestUtils.getRunningStreams(STREAMS_CONFIG, 
streamsBuilder, false);
 
             produceKeyValues("f", "g", "h");
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java
index d7ce484ba55..bcbc36f3152 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java
@@ -135,7 +135,7 @@ public class KafkaStreamsCloseOptionsIntegrationTest {
             adminClient = Admin.create(commonClientConfig);
         }
 
-        CLUSTER.deleteAllTopicsAndWait(120_000L);
+        CLUSTER.deleteAllTopics();
         CLUSTER.createTopic(INPUT_TOPIC, 2, 1);
         CLUSTER.createTopic(OUTPUT_TOPIC, 2, 1);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
index 4e341b07234..5f0fd659db7 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
@@ -97,7 +97,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 @Timeout(600)
 @Tag("integration")
 public class NamedTopologyIntegrationTest {
-    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(3);
 
     private static final String TOPOLOGY_1 = "topology-1";
     private static final String TOPOLOGY_2 = "topology-2";
@@ -243,14 +243,14 @@ public class NamedTopologyIntegrationTest {
         CLUSTER.getAllTopicsInCluster().stream().filter(t -> 
t.contains("-changelog") || t.contains("-repartition")).forEach(t -> {
             try {
                 assertThat("topic was not decorated", 
t.contains(TOPIC_PREFIX));
-                CLUSTER.deleteTopicsAndWait(t);
-            } catch (final InterruptedException e) {
+                CLUSTER.deleteTopics(t);
+            } catch (final RuntimeException e) {
                 e.printStackTrace();
             }
         });
 
-        CLUSTER.deleteTopicsAndWait(OUTPUT_STREAM_1, OUTPUT_STREAM_2, 
OUTPUT_STREAM_3);
-        CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
+        CLUSTER.deleteTopics(OUTPUT_STREAM_1, OUTPUT_STREAM_2, 
OUTPUT_STREAM_3);
+        CLUSTER.deleteTopics(SUM_OUTPUT, COUNT_OUTPUT);
     }
 
     @Test
@@ -518,8 +518,8 @@ public class NamedTopologyIntegrationTest {
 
         CLUSTER.getAllTopicsInCluster().stream().filter(t -> 
t.contains("-changelog")).forEach(t -> {
             try {
-                CLUSTER.deleteTopicAndWait(t);
-            } catch (final InterruptedException e) {
+                CLUSTER.deleteTopic(t);
+            } catch (final RuntimeException e) {
                 e.printStackTrace();
             }
         });
@@ -570,7 +570,7 @@ public class NamedTopologyIntegrationTest {
             assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
COUNT_OUTPUT, 5), equalTo(COUNT_OUTPUT_DATA));
             assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
SUM_OUTPUT, 5), equalTo(SUM_OUTPUT_DATA));
         } finally {
-            CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
+            CLUSTER.deleteTopics(SUM_OUTPUT, COUNT_OUTPUT);
             CLUSTER.deleteTopics(DELAYED_INPUT_STREAM_1);
         }
     }
@@ -624,8 +624,8 @@ public class NamedTopologyIntegrationTest {
 
             CLUSTER.getAllTopicsInCluster().stream().filter(t -> 
t.contains("changelog")).forEach(t -> {
                 try {
-                    CLUSTER.deleteTopicAndWait(t);
-                } catch (final InterruptedException e) {
+                    CLUSTER.deleteTopic(t);
+                } catch (final RuntimeException e) {
                     e.printStackTrace();
                 }
             });
@@ -640,7 +640,7 @@ public class NamedTopologyIntegrationTest {
             assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
COUNT_OUTPUT, 5), equalTo(COUNT_OUTPUT_DATA));
             assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
SUM_OUTPUT, 5), equalTo(SUM_OUTPUT_DATA));
         } finally {
-            CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
+            CLUSTER.deleteTopics(SUM_OUTPUT, COUNT_OUTPUT);
         }
     }
 
@@ -662,8 +662,8 @@ public class NamedTopologyIntegrationTest {
 
         CLUSTER.getAllTopicsInCluster().stream().filter(t -> 
t.contains("-changelog") || t.contains("-repartition")).forEach(t -> {
             try {
-                CLUSTER.deleteTopicsAndWait(t);
-            } catch (final InterruptedException e) {
+                CLUSTER.deleteTopics(t);
+            } catch (final RuntimeException e) {
                 e.printStackTrace();
             }
         });
@@ -678,7 +678,7 @@ public class NamedTopologyIntegrationTest {
         assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
COUNT_OUTPUT, 5), equalTo(COUNT_OUTPUT_DATA));
         assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
SUM_OUTPUT, 5), equalTo(SUM_OUTPUT_DATA));
 
-        CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
+        CLUSTER.deleteTopics(SUM_OUTPUT, COUNT_OUTPUT);
     }
 
     /**
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/PositionRestartIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/PositionRestartIntegrationTest.java
index 044acfac8b0..bc21a2a8b6a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/PositionRestartIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/PositionRestartIntegrationTest.java
@@ -102,7 +102,7 @@ import static org.hamcrest.Matchers.is;
 public class PositionRestartIntegrationTest {
     private static final Logger LOG = 
LoggerFactory.getLogger(PositionRestartIntegrationTest.class);
     private static final long SEED = new Random().nextLong();
-    private static final int NUM_BROKERS = 1;
+    private static final int NUM_BROKERS = 3;
     public static final Duration WINDOW_SIZE = Duration.ofMinutes(5);
     private static int port = 0;
     private static final String INPUT_TOPIC_NAME = "input-topic";
@@ -274,7 +274,7 @@ public class PositionRestartIntegrationTest {
         throws InterruptedException, IOException, ExecutionException, 
TimeoutException {
 
         CLUSTER.start();
-        CLUSTER.deleteAllTopicsAndWait(60 * 1000L);
+        CLUSTER.deleteAllTopics();
         final int partitions = 2;
         CLUSTER.createTopic(INPUT_TOPIC_NAME, partitions, 1);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index f16c3e57c39..2f84b0b4f4a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -242,7 +242,7 @@ public class QueryableStateIntegrationTest {
             kafkaStreams.close(ofSeconds(30));
         }
         IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
-        CLUSTER.deleteAllTopicsAndWait(0L);
+        CLUSTER.deleteAllTopics();
     }
 
     /**
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java
index 477024dce53..6842c7718e1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java
@@ -153,7 +153,7 @@ public class RangeQueryIntegrationTest {
 
     @AfterEach
     public void cleanup() throws InterruptedException {
-        CLUSTER.deleteAllTopicsAndWait(120000);
+        CLUSTER.deleteAllTopics();
     }
 
     @ParameterizedTest
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index ae953020520..7fff8099a16 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -196,7 +196,7 @@ public class RegexSourceIntegrationTest {
 
             streams.close();
         } finally {
-            CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2");
+            CLUSTER.deleteTopics("TEST-TOPIC-1", "TEST-TOPIC-2");
         }
     }
 
@@ -248,7 +248,7 @@ public class RegexSourceIntegrationTest {
 
             streams.close();
         } finally {
-            CLUSTER.deleteTopicsAndWait(topic1, topic2);
+            CLUSTER.deleteTopics(topic1, topic2);
         }
     }
 
@@ -290,7 +290,7 @@ public class RegexSourceIntegrationTest {
             streams.start();
             TestUtils.waitForCondition(() -> 
assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED);
         } finally {
-            CLUSTER.deleteTopicAndWait("TEST-TOPIC-A");
+            CLUSTER.deleteTopic("TEST-TOPIC-A");
         }
 
         TestUtils.waitForCondition(() -> 
assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 1c8599a3b19..13856c58dc9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -68,7 +68,7 @@ public class ResetIntegrationTest extends 
AbstractResetIntegrationTest {
         // expiration of connections by the brokers to avoid errors when 
`AdminClient` sends requests after potentially
         // very long sleep times
         brokerProps.put(SocketServerConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG, 
-1L);
-        CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+        CLUSTER = new EmbeddedKafkaCluster(3, brokerProps);
     }
 
     @BeforeAll
@@ -98,7 +98,7 @@ public class ResetIntegrationTest extends 
AbstractResetIntegrationTest {
     }
 
     @Test
-    public void shouldNotAllowToResetWhileStreamsIsRunning(final TestInfo 
testInfo) {
+    public void shouldNotAllowToResetWhileStreamsIsRunning(final TestInfo 
testInfo) throws Exception {
         final String appID = IntegrationTestUtils.safeUniqueTestName(testInfo);
         final String[] parameters = new String[] {
             "--application-id", appID,
@@ -113,7 +113,7 @@ public class ResetIntegrationTest extends 
AbstractResetIntegrationTest {
 
         // RUN
         streams = new 
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
-        streams.start();
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
 
         final int exitCode = new StreamsResetter().execute(parameters, 
cleanUpConfig);
         assertEquals(1, exitCode);
@@ -193,7 +193,8 @@ public class ResetIntegrationTest extends 
AbstractResetIntegrationTest {
 
         // Run
         streams = new 
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
-        streams.start();
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
+
         final List<KeyValue<Long, Long>> result = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, 
OUTPUT_TOPIC, 10);
 
         streams.close();
@@ -213,7 +214,7 @@ public class ResetIntegrationTest extends 
AbstractResetIntegrationTest {
         assertInternalTopicsGotDeleted(null);
 
         // RE-RUN
-        streams.start();
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
         final List<KeyValue<Long, Long>> resultRerun = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, 
OUTPUT_TOPIC, 10);
         streams.close();
 
@@ -228,7 +229,8 @@ public class ResetIntegrationTest extends 
AbstractResetIntegrationTest {
 
         // RUN
         streams = new 
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
-        streams.start();
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
+
         final List<KeyValue<Long, Long>> result = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, 
OUTPUT_TOPIC, 10);
 
         streams.close();
@@ -251,7 +253,7 @@ public class ResetIntegrationTest extends 
AbstractResetIntegrationTest {
         resetFile.deleteOnExit();
 
         // RE-RUN
-        streams.start();
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
         final List<KeyValue<Long, Long>> resultRerun = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, 
OUTPUT_TOPIC, 5);
         streams.close();
 
@@ -269,7 +271,8 @@ public class ResetIntegrationTest extends 
AbstractResetIntegrationTest {
 
         // RUN
         streams = new 
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
-        streams.start();
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
+
         final List<KeyValue<Long, Long>> result = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, 
OUTPUT_TOPIC, 10);
 
         streams.close();
@@ -297,7 +300,8 @@ public class ResetIntegrationTest extends 
AbstractResetIntegrationTest {
         resetFile.deleteOnExit();
 
         // RE-RUN
-        streams.start();
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
+
         final List<KeyValue<Long, Long>> resultRerun = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, 
OUTPUT_TOPIC, 10);
         streams.close();
 
@@ -314,7 +318,8 @@ public class ResetIntegrationTest extends 
AbstractResetIntegrationTest {
 
         // RUN
         streams = new 
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
-        streams.start();
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
+
         final List<KeyValue<Long, Long>> result = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, 
OUTPUT_TOPIC, 10);
 
         streams.close();
@@ -337,7 +342,8 @@ public class ResetIntegrationTest extends 
AbstractResetIntegrationTest {
         resetFile.deleteOnExit();
 
         // RE-RUN
-        streams.start();
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
+
         final List<KeyValue<Long, Long>> resultRerun = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, 
OUTPUT_TOPIC, 10);
         streams.close();
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java
index 65a99a0b1b5..7f47233361e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java
@@ -33,8 +33,6 @@ import java.io.IOException;
 import java.util.Map;
 import java.util.Properties;
 
-import static 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG;
-
 /**
  * Tests command line SSL setup for reset tool.
  */
@@ -54,9 +52,7 @@ public class ResetIntegrationWithSslTest extends 
AbstractResetIntegrationTest {
 
         try {
             SSL_CONFIG = TestSslUtils.createSslConfig(false, true, 
ConnectionMode.SERVER, TestUtils.tempFile(), "testCert");
-
-            brokerProps.put(SocketServerConfigs.LISTENERS_CONFIG, 
"SSL://localhost:0");
-            brokerProps.put(INTER_BROKER_LISTENER_NAME_CONFIG, "SSL");
+            
brokerProps.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, 
"EXTERNAL:SSL,CONTROLLER:SSL,INTERNAL:SSL");
             brokerProps.putAll(SSL_CONFIG);
         } catch (final Exception e) {
             throw new RuntimeException(e);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java
index a0cedcb2840..cff0d74fcad 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java
@@ -44,6 +44,7 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
+import java.time.Instant;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
@@ -64,13 +65,15 @@ import static org.hamcrest.MatcherAssert.assertThat;
 public class ResetPartitionTimeIntegrationTest {
     private static final int NUM_BROKERS = 1;
     private static final Properties BROKER_CONFIG;
+    private static final long NOW = Instant.now().toEpochMilli();
+
     static {
         BROKER_CONFIG = new Properties();
         BROKER_CONFIG.put("transaction.state.log.replication.factor", (short) 
1);
         BROKER_CONFIG.put("transaction.state.log.min.isr", 1);
     }
     public static final EmbeddedKafkaCluster CLUSTER =
-        new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG, 0L);
+        new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG);
 
     @BeforeAll
     public static void startCluster() throws IOException {
@@ -117,13 +120,13 @@ public class ResetPartitionTimeIntegrationTest {
             produceSynchronouslyToPartitionZero(
                 input,
                 Collections.singletonList(
-                    new KeyValueTimestamp<>("k3", "v3", 5000)
+                    new KeyValueTimestamp<>("k3", "v3", NOW + 5000)
                 )
             );
             verifyOutput(
                 outputRaw,
                 Collections.singletonList(
-                    new KeyValueTimestamp<>("k3", "v3", 5000)
+                    new KeyValueTimestamp<>("k3", "v3", NOW + 5000)
                 )
             );
             assertThat(lastRecordedTimestamp, is(-1L));
@@ -138,16 +141,16 @@ public class ResetPartitionTimeIntegrationTest {
             produceSynchronouslyToPartitionZero(
                 input,
                 Collections.singletonList(
-                    new KeyValueTimestamp<>("k5", "v5", 4999)
+                    new KeyValueTimestamp<>("k5", "v5", NOW + 4999)
                 )
             );
             verifyOutput(
                 outputRaw,
                 Collections.singletonList(
-                    new KeyValueTimestamp<>("k5", "v5", 4999)
+                    new KeyValueTimestamp<>("k5", "v5", NOW + 4999)
                 )
             );
-            assertThat(lastRecordedTimestamp, is(5000L));
+            assertThat(lastRecordedTimestamp, is(NOW + 5000L));
         } finally {
             kafkaStreams.close();
             quietlyCleanStateAfterTest(CLUSTER, kafkaStreams);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index 636c9c52f05..5fe61eed66e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -117,7 +117,7 @@ public class RestoreIntegrationTest {
 
     private static final Duration RESTORATION_DELAY = Duration.ofMillis(2000);
 
-    private static final int NUM_BROKERS = 1;
+    private static final int NUM_BROKERS = 2;
 
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
index 5fd98bbd7d6..22ac5e5408e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
@@ -135,7 +135,7 @@ public class RocksDBMetricsIntegrationTest {
 
     @AfterEach
     public void after() throws Exception {
-        CLUSTER.deleteTopicsAndWait(STREAM_INPUT_ONE, STREAM_INPUT_TWO, 
STREAM_OUTPUT_ONE, STREAM_OUTPUT_TWO);
+        CLUSTER.deleteTopics(STREAM_INPUT_ONE, STREAM_INPUT_TWO, 
STREAM_OUTPUT_ONE, STREAM_OUTPUT_TWO);
     }
 
     @FunctionalInterface
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
index f910043d1e0..1c3bd11957e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
@@ -104,7 +104,7 @@ public class StandbyTaskEOSIntegrationTest {
         inputTopic = "input-" + safeTestName;
         outputTopic = "output-" + safeTestName;
         storeName = "store-" + safeTestName;
-        CLUSTER.deleteTopicsAndWait(inputTopic, outputTopic, appId + 
"-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog");
+        CLUSTER.deleteTopics(inputTopic, outputTopic, appId + 
"-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog");
         CLUSTER.createTopic(inputTopic, 1, 3);
         CLUSTER.createTopic(outputTopic, 1, 3);
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSMultiRebalanceIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSMultiRebalanceIntegrationTest.java
index 66f5b5f0209..e4d355cdabe 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSMultiRebalanceIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSMultiRebalanceIntegrationTest.java
@@ -18,9 +18,11 @@ package org.apache.kafka.streams.integration;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StoreQueryParameters;
@@ -99,7 +101,7 @@ public class StandbyTaskEOSMultiRebalanceIntegrationTest {
         storeName = "store-" + safeTestName;
         counterName = "counter-" + safeTestName;
 
-        CLUSTER.deleteTopicsAndWait(inputTopic, outputTopic);
+        CLUSTER.deleteTopics(inputTopic, outputTopic);
         CLUSTER.createTopic(inputTopic, partitionCount, 3);
         CLUSTER.createTopic(outputTopic, partitionCount, 3);
     }
@@ -141,7 +143,7 @@ public class StandbyTaskEOSMultiRebalanceIntegrationTest {
                         CLUSTER.bootstrapServers(),
                         IntegerSerializer.class,
                         IntegerSerializer.class,
-                        new Properties()
+                        
Utils.mkProperties(Collections.singletonMap(ProducerConfig.ACKS_CONFIG, "all"))
                 ),
                 10L + time
         );
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StateDirectoryIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StateDirectoryIntegrationTest.java
index df4359228c6..3c3a3721748 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/StateDirectoryIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StateDirectoryIntegrationTest.java
@@ -171,7 +171,7 @@ public class StateDirectoryIntegrationTest {
                     ).findFirst().isPresent()
             );
         } finally {
-            CLUSTER.deleteAllTopicsAndWait(0L);
+            CLUSTER.deleteAllTopics();
         }
     }
 
@@ -271,7 +271,7 @@ public class StateDirectoryIntegrationTest {
             assertTrue((new File(stateDir)).exists());  // Root state store 
exists
             assertTrue(appDir.exists());    // Application state store exists
         } finally {
-            CLUSTER.deleteAllTopicsAndWait(0L);
+            CLUSTER.deleteAllTopics();
         }
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
index 476c1857422..9ab5c7d574c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
@@ -54,6 +54,7 @@ import org.junit.jupiter.api.Timeout;
 
 import java.io.IOException;
 import java.time.Duration;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -80,8 +81,9 @@ import static org.junit.jupiter.api.Assertions.fail;
 @Tag("integration")
 @Timeout(600)
 public class StreamsUncaughtExceptionHandlerIntegrationTest {
+    private static final long NOW = Instant.now().toEpochMilli();
 
-    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1, new Properties(), Collections.emptyList(), 0L, 0L);
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
 
     @BeforeAll
     public static void startCluster() throws IOException {
@@ -146,7 +148,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest 
{
             kafkaStreams.setUncaughtExceptionHandler((t, e) -> 
counter.incrementAndGet());
 
             startApplicationAndWaitUntilRunning(kafkaStreams);
-            produceMessages(0L, inputTopic, "A");
+            produceMessages(NOW, inputTopic, "A");
 
             // should call the UncaughtExceptionHandler in current thread
             TestUtils.waitForCondition(() -> counter.get() == 1, "Handler was 
called 1st time");
@@ -168,7 +170,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest 
{
 
             startApplicationAndWaitUntilRunning(kafkaStreams);
 
-            produceMessages(0L, inputTopic, "A");
+            produceMessages(NOW, inputTopic, "A");
             waitForApplicationState(Collections.singletonList(kafkaStreams), 
KafkaStreams.State.ERROR, DEFAULT_DURATION);
 
             assertThat(processorValueCollector.size(), equalTo(1));
@@ -252,7 +254,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest 
{
 
             startApplicationAndWaitUntilRunning(kafkaStreams);
 
-            produceMessages(0L, inputTopic2, "A");
+            produceMessages(NOW, inputTopic2, "A");
             waitForApplicationState(Collections.singletonList(kafkaStreams), 
KafkaStreams.State.ERROR, DEFAULT_DURATION);
 
             assertThat(processorValueCollector.size(), equalTo(1));
@@ -297,7 +299,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest 
{
                     IntegerSerializer.class,
                     StringSerializer.class,
                     new Properties()),
-                0L);
+                    NOW);
 
             IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
                 inputTopic2,
@@ -310,7 +312,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest 
{
                     IntegerSerializer.class,
                     StringSerializer.class,
                     new Properties()),
-                0L);
+                    NOW);
 
             IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
                 TestUtils.consumerConfig(
@@ -365,7 +367,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest 
{
 
             startApplicationAndWaitUntilRunning(asList(kafkaStreams1, 
kafkaStreams2));
 
-            produceMessages(0L, inputTopic, "A");
+            produceMessages(NOW, inputTopic, "A");
             waitForApplicationState(asList(kafkaStreams1, kafkaStreams2), 
KafkaStreams.State.ERROR, DEFAULT_DURATION);
 
             assertThat(processorValueCollector.size(), equalTo(1));
@@ -386,7 +388,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest 
{
             });
             startApplicationAndWaitUntilRunning(kafkaStreams);
 
-            produceMessages(0L, inputTopic, "A");
+            produceMessages(NOW, inputTopic, "A");
             TestUtils.waitForCondition(() -> count.get() == numThreads, 
"finished replacing threads");
             TestUtils.waitForCondition(() -> throwError.get(), "finished 
replacing threads");
             kafkaStreams.close();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
index 9215a0095e1..47d37f76d8c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
@@ -55,11 +55,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.HashSet;
+import java.time.Instant;
 import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
-import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -82,12 +81,8 @@ import static org.hamcrest.Matchers.equalTo;
 @Tag("integration")
 @Timeout(600)
 public class SuppressionDurabilityIntegrationTest {
-
-    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(
-        3,
-        mkProperties(mkMap()),
-        0L
-    );
+    private static final long NOW = Instant.now().toEpochMilli();
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(3);
 
     @BeforeAll
     public static void startCluster() throws IOException {
@@ -173,11 +168,11 @@ public class SuppressionDurabilityIntegrationTest {
             );
             verifyOutput(
                 outputRaw,
-                new HashSet<>(asList(
+                asList(
                     new KeyValueTimestamp<>("k1", 1L, scaledTime(1L)),
                     new KeyValueTimestamp<>("k2", 1L, scaledTime(2L)),
                     new KeyValueTimestamp<>("k3", 1L, scaledTime(3L))
-                ))
+                )
             );
             assertThat(eventCount.get(), is(0));
 
@@ -191,10 +186,10 @@ public class SuppressionDurabilityIntegrationTest {
             );
             verifyOutput(
                 outputRaw,
-                new HashSet<>(asList(
+                asList(
                     new KeyValueTimestamp<>("k4", 1L, scaledTime(4L)),
                     new KeyValueTimestamp<>("k5", 1L, scaledTime(5L))
-                ))
+                )
             );
             assertThat(eventCount.get(), is(2));
             verifyOutput(
@@ -225,11 +220,11 @@ public class SuppressionDurabilityIntegrationTest {
             );
             verifyOutput(
                 outputRaw,
-                new HashSet<>(asList(
+                asList(
                     new KeyValueTimestamp<>("k6", 1L, scaledTime(6L)),
                     new KeyValueTimestamp<>("k7", 1L, scaledTime(7L)),
                     new KeyValueTimestamp<>("k8", 1L, scaledTime(8L))
-                ))
+                )
             );
             assertThat("suppress has apparently produced some duplicates. 
There should only be 5 output events.",
                        eventCount.get(), is(5));
@@ -303,24 +298,12 @@ public class SuppressionDurabilityIntegrationTest {
         IntegrationTestUtils.verifyKeyValueTimestamps(properties, topic, 
keyValueTimestamps);
     }
 
-    private void verifyOutput(final String topic, final 
Set<KeyValueTimestamp<String, Long>> keyValueTimestamps) {
-        final Properties properties = mkProperties(
-            mkMap(
-                mkEntry(ConsumerConfig.GROUP_ID_CONFIG, "test-group"),
-                mkEntry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers()),
-                mkEntry(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
((Deserializer<String>) STRING_DESERIALIZER).getClass().getName()),
-                mkEntry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
((Deserializer<Long>) LONG_DESERIALIZER).getClass().getName())
-            )
-        );
-        IntegrationTestUtils.verifyKeyValueTimestamps(properties, topic, 
keyValueTimestamps);
-    }
-
     /**
      * scaling to ensure that there are commits in between the various test 
events,
      * just to exercise that everything works properly in the presence of 
commits.
      */
     private long scaledTime(final long unscaledTime) {
-        return COMMIT_INTERVAL * 2 * unscaledTime;
+        return NOW + COMMIT_INTERVAL * 2 * unscaledTime;
     }
 
     private static void produceSynchronouslyToPartitionZero(final String 
topic, final List<KeyValueTimestamp<String, String>> toProduce) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
index 9b9722e8539..3d7c141129f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
@@ -52,6 +52,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
 import java.io.IOException;
+import java.time.Instant;
 import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
@@ -83,12 +84,8 @@ import static org.hamcrest.Matchers.empty;
 @Tag("integration")
 @Timeout(600)
 public class SuppressionIntegrationTest {
-
-    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(
-        1,
-        mkProperties(mkMap()),
-        0L
-    );
+    private static final long NOW = Instant.now().toEpochMilli();
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
 
     @BeforeAll
     public static void startCluster() throws IOException {
@@ -525,7 +522,7 @@ public class SuppressionIntegrationTest {
      * just to exercise that everything works properly in the presence of 
commits.
      */
     private static long scaledTime(final long unscaledTime) {
-        return COMMIT_INTERVAL * 2 * unscaledTime;
+        return NOW + COMMIT_INTERVAL * 2 * unscaledTime;
     }
 
     private static void produceSynchronously(final String topic, final 
List<KeyValueTimestamp<String, String>> toProduce) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
index 0238dcae621..9f025ee414a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
@@ -61,7 +61,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 @Timeout(600)
 public class TaskMetadataIntegrationTest {
 
-    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1, new Properties(), Collections.emptyList(), 0L, 0L);
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1, new Properties(), Collections.emptyMap(), 0L, 0L);
 
     @BeforeAll
     public static void startCluster() throws IOException {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index f4f03b98330..16334714168 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -16,19 +16,36 @@
  */
 package org.apache.kafka.streams.integration.utils;
 
-import kafka.server.KafkaServer;
-import kafka.zk.EmbeddedZookeeper;
+import kafka.testkit.KafkaClusterTestKit;
+import kafka.testkit.TestKitNodes;
 
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
 import org.apache.kafka.network.SocketServerConfigs;
-import org.apache.kafka.server.config.ConfigType;
 import org.apache.kafka.server.config.ServerConfigs;
 import org.apache.kafka.server.config.ServerLogConfigs;
-import org.apache.kafka.server.config.ZkConfigs;
 import org.apache.kafka.server.util.MockTime;
 import org.apache.kafka.storage.internals.log.CleanerConfig;
 import org.apache.kafka.test.TestCondition;
@@ -37,115 +54,143 @@ import org.apache.kafka.test.TestUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
+import java.time.Duration;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
 
 /**
- * Runs an in-memory, "embedded" Kafka cluster with 1 ZooKeeper instance and 
supplied number of Kafka brokers.
+ * Setup an embedded Kafka KRaft cluster for integration tests (using {@link 
kafka.testkit.KafkaClusterTestKit} internally) with the
+ * specified number of brokers and the specified broker properties.
+ * Additional Kafka client properties can also be supplied if required.
+ * This class also provides various utility methods to easily create Kafka 
topics, produce data, consume data etc.
  */
 public class EmbeddedKafkaCluster {
 
     private static final Logger log = 
LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
-    private static final int DEFAULT_BROKER_PORT = 0; // 0 results in a random 
port being selected
-    private static final int TOPIC_CREATION_TIMEOUT = 30000;
-    private static final int TOPIC_DELETION_TIMEOUT = 30000;
-    private EmbeddedZookeeper zookeeper = null;
-    private final KafkaEmbedded[] brokers;
-
+    private final KafkaClusterTestKit cluster;
     private final Properties brokerConfig;
-    private final List<Properties> brokerConfigOverrides;
     public final MockTime time;
-
     public EmbeddedKafkaCluster(final int numBrokers) {
         this(numBrokers, new Properties());
     }
 
-    public EmbeddedKafkaCluster(final int numBrokers,
-                                final Properties brokerConfig) {
-        this(numBrokers, brokerConfig, System.currentTimeMillis());
+    public EmbeddedKafkaCluster(final int numBrokers, final Properties 
brokerConfig) {
+        this(numBrokers, brokerConfig, Collections.emptyMap());
     }
 
     public EmbeddedKafkaCluster(final int numBrokers,
                                 final Properties brokerConfig,
                                 final long mockTimeMillisStart) {
-        this(numBrokers, brokerConfig, Collections.emptyList(), 
mockTimeMillisStart);
-    }
-
-    public EmbeddedKafkaCluster(final int numBrokers,
-                                final Properties brokerConfig,
-                                final List<Properties> brokerConfigOverrides) {
-        this(numBrokers, brokerConfig, brokerConfigOverrides, 
System.currentTimeMillis());
+        this(numBrokers, brokerConfig, Collections.emptyMap(), 
mockTimeMillisStart, System.nanoTime());
     }
-
     public EmbeddedKafkaCluster(final int numBrokers,
                                 final Properties brokerConfig,
-                                final List<Properties> brokerConfigOverrides,
-                                final long mockTimeMillisStart) {
-        this(numBrokers, brokerConfig, brokerConfigOverrides, 
mockTimeMillisStart, System.nanoTime());
+                                final Map<Integer, Map<String, String>> 
brokerConfigOverrides) {
+        this(numBrokers, brokerConfig, brokerConfigOverrides, 
System.currentTimeMillis(), System.nanoTime());
     }
-
     public EmbeddedKafkaCluster(final int numBrokers,
                                 final Properties brokerConfig,
-                                final List<Properties> brokerConfigOverrides,
+                                final Map<Integer, Map<String, String>> 
brokerConfigOverrides,
                                 final long mockTimeMillisStart,
                                 final long mockTimeNanoStart) {
+        addDefaultBrokerPropsIfAbsent(brokerConfig);
+
         if (!brokerConfigOverrides.isEmpty() && brokerConfigOverrides.size() 
!= numBrokers) {
             throw new IllegalArgumentException("Size of brokerConfigOverrides 
" + brokerConfigOverrides.size()
-                + " must match broker number " + numBrokers);
+                    + " must match broker number " + numBrokers);
+        }
+        try {
+            final KafkaClusterTestKit.Builder clusterBuilder = new 
KafkaClusterTestKit.Builder(
+                    new TestKitNodes.Builder()
+                            .setCombined(true)
+                            .setNumBrokerNodes(numBrokers)
+                            .setPerServerProperties(brokerConfigOverrides)
+                            // Reduce number of controllers for faster startup
+                            // We may make this configurable in the future if 
there's a use case for it
+                            .setNumControllerNodes(1)
+                            .build()
+            );
+
+            brokerConfig.forEach((k, v) -> 
clusterBuilder.setConfigProp((String) k, v));
+            cluster = clusterBuilder.build();
+            cluster.nonFatalFaultHandler().setIgnore(true);
+        } catch (final Exception e) {
+            throw new KafkaException("Failed to create test Kafka cluster", e);
         }
-        brokers = new KafkaEmbedded[numBrokers];
         this.brokerConfig = brokerConfig;
-        time = new MockTime(mockTimeMillisStart, mockTimeNanoStart);
-        this.brokerConfigOverrides = brokerConfigOverrides;
+        this.time = new MockTime(mockTimeMillisStart, mockTimeNanoStart);
+    }
+
+    public void start() {
+        try {
+            cluster.format();
+            cluster.startup();
+            cluster.waitForReadyBrokers();
+        } catch (final Exception e) {
+            throw new KafkaException("Failed to start test Kafka cluster", e);
+        }
+
+        verifyClusterReadiness();
     }
 
     /**
-     * Creates and starts a Kafka cluster.
+     * Perform an extended check to ensure that the primary APIs of the 
cluster are available, including:
+     * <ul>
+     *     <li>Ability to create a topic</li>
+     *     <li>Ability to produce to a topic</li>
+     *     <li>Ability to form a consumer group</li>
+     *     <li>Ability to consume from a topic</li>
+     * </ul>
+     * If this method completes successfully, all resources created to verify 
the cluster health
+     * (such as topics and consumer groups) will be cleaned up before it 
returns.
+     * <p>
+     * This provides extra guarantees compared to other cluster readiness 
checks such as
+     * {@link KafkaClusterTestKit#waitForReadyBrokers()}, which verify that 
brokers have
+     * completed startup and joined the cluster, but do not verify that the 
internal consumer
+     * offsets topic has been created or that it's actually possible for users 
to create and
+     * interact with topics.
      */
-    public void start() throws IOException {
-        log.debug("Initiating embedded Kafka cluster startup");
-        log.debug("Starting a ZooKeeper instance");
-        zookeeper = new EmbeddedZookeeper();
-        log.debug("ZooKeeper instance is running at {}", zKConnectString());
-
-        brokerConfig.put(ZkConfigs.ZK_CONNECT_CONFIG, zKConnectString());
-        putIfAbsent(brokerConfig, SocketServerConfigs.LISTENERS_CONFIG, 
"PLAINTEXT://localhost:" + DEFAULT_BROKER_PORT);
-        putIfAbsent(brokerConfig, ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, 
true);
-        putIfAbsent(brokerConfig, 
CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, 2 * 1024 * 1024L);
-        putIfAbsent(brokerConfig, 
GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 0);
-        putIfAbsent(brokerConfig, 
GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 0);
-        putIfAbsent(brokerConfig, 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, (short) 1);
-        putIfAbsent(brokerConfig, 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 5);
-        putIfAbsent(brokerConfig, 
TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 5);
-        putIfAbsent(brokerConfig, 
ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, true);
-
-        for (int i = 0; i < brokers.length; i++) {
-            brokerConfig.put(ServerConfigs.BROKER_ID_CONFIG, i);
-            log.debug("Starting a Kafka instance on {} ...", 
brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG));
-
-            final Properties effectiveConfig = new Properties();
-            effectiveConfig.putAll(brokerConfig);
-            if (brokerConfigOverrides != null && brokerConfigOverrides.size() 
> i) {
-                effectiveConfig.putAll(brokerConfigOverrides.get(i));
+    public void verifyClusterReadiness() {
+        final UUID uuid = UUID.randomUUID();
+        final String consumerGroupId = "group-warmup-" + uuid;
+        final Map<String, Object> consumerConfig = 
Collections.singletonMap(GROUP_ID_CONFIG, consumerGroupId);
+        final String topic = "topic-warmup-" + uuid;
+
+        createTopic(topic);
+        final Map<String, Object> producerProps = new 
HashMap<>(clientDefaultConfig());
+        producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "warmup-producer");
+        produce(producerProps, topic, null, "warmup message key", "warmup 
message value");
+
+        try (Consumer<?, ?> consumer = 
createConsumerAndSubscribeTo(consumerConfig, topic)) {
+            final ConsumerRecords<?, ?> records = 
consumer.poll(Duration.ofMillis(TimeUnit.MINUTES.toMillis(2)));
+            if (records.isEmpty()) {
+                throw new AssertionError("Failed to verify availability of 
group coordinator and produce/consume APIs on Kafka cluster in time");
             }
-            brokers[i] = new KafkaEmbedded(effectiveConfig, time);
-
-            log.debug("Kafka instance is running at {}, connected to ZooKeeper 
at {}",
-                brokers[i].brokerList(), brokers[i].zookeeperConnect());
         }
-    }
 
-    private void putIfAbsent(final Properties props, final String propertyKey, 
final Object propertyValue) {
-        if (!props.containsKey(propertyKey)) {
-            brokerConfig.put(propertyKey, propertyValue);
+        try (Admin admin = createAdminClient()) {
+            
admin.deleteConsumerGroups(Collections.singleton(consumerGroupId)).all().get(30,
 TimeUnit.SECONDS);
+            admin.deleteTopics(Collections.singleton(topic)).all().get(30, 
TimeUnit.SECONDS);
+        } catch (final InterruptedException | ExecutionException | 
TimeoutException e) {
+            throw new AssertionError("Failed to clean up cluster health check 
resource(s)", e);
         }
     }
 
@@ -153,46 +198,22 @@ public class EmbeddedKafkaCluster {
      * Stop the Kafka cluster.
      */
     public void stop() {
-        if (brokers.length > 1) {
-            // delete the topics first to avoid cascading leader elections 
while shutting down the brokers
-            final Set<String> topics = getAllTopicsInCluster();
-            if (!topics.isEmpty()) {
-                try (final Admin adminClient = brokers[0].createAdminClient()) 
{
-                    adminClient.deleteTopics(topics).all().get();
-                } catch (final InterruptedException e) {
-                    log.warn("Got interrupted while deleting topics in 
preparation for stopping embedded brokers", e);
-                    throw new RuntimeException(e);
-                } catch (final ExecutionException | RuntimeException e) {
-                    log.warn("Couldn't delete all topics before stopping 
brokers", e);
-                }
-            }
+        final AtomicReference<Throwable> shutdownFailure = new 
AtomicReference<>();
+        Utils.closeQuietly(cluster, "embedded Kafka cluster", shutdownFailure);
+        if (shutdownFailure.get() != null) {
+            throw new KafkaException("Failed to shut down producer / embedded 
Kafka cluster", shutdownFailure.get());
         }
-        for (final KafkaEmbedded broker : brokers) {
-            broker.stopAsync();
-        }
-        for (final KafkaEmbedded broker : brokers) {
-            broker.awaitStoppedAndPurge();
-        }
-        zookeeper.shutdown();
     }
 
-    /**
-     * The ZooKeeper connection string aka `zookeeper.connect` in 
`hostnameOrIp:port` format.
-     * Example: `127.0.0.1:2181`.
-     * <p>
-     * You can use this to e.g. tell Kafka brokers how to connect to this 
instance.
-     */
-    public String zKConnectString() {
-        return "127.0.0.1:" + zookeeper.port();
+    public String bootstrapServers() {
+        return cluster.bootstrapServers();
     }
 
-    /**
-     * This cluster's `bootstrap.servers` value.  Example: `127.0.0.1:9092`.
-     * <p>
-     * You can use this to tell Kafka producers how to connect to this cluster.
-     */
-    public String bootstrapServers() {
-        return brokers[0].brokerList();
+    public boolean sslEnabled() {
+        final String listenerSecurityProtocolMap = 
brokerConfig.getProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG);
+        if (listenerSecurityProtocolMap == null)
+            return false;
+        return listenerSecurityProtocolMap.contains(":SSL") || 
listenerSecurityProtocolMap.contains(":SASL_SSL");
     }
 
     /**
@@ -211,8 +232,18 @@ public class EmbeddedKafkaCluster {
      *
      * @param topic The name of the topic.
      */
-    public void createTopic(final String topic) throws InterruptedException {
-        createTopic(topic, 1, 1, Collections.emptyMap());
+    public void createTopic(final String topic) {
+        createTopic(topic, 1);
+    }
+
+    /**
+     * Create a Kafka topic with given partition and a replication factor of 1.
+     *
+     * @param topic The name of the topic.
+     * @param partitions  The number of partitions for this topic.
+     */
+    public void createTopic(final String topic, final int partitions) {
+        createTopic(topic, partitions, 1, Collections.emptyMap());
     }
 
     /**
@@ -227,116 +258,177 @@ public class EmbeddedKafkaCluster {
     }
 
     /**
-     * Create a Kafka topic with the given parameters.
+     * Create a Kafka topic with given partition, replication factor, and 
topic config.
      *
-     * @param topic       The name of the topic.
+     * @param topic The name of the topic.
      * @param partitions  The number of partitions for this topic.
      * @param replication The replication factor for (partitions of) this 
topic.
      * @param topicConfig Additional topic-level configuration settings.
      */
-    public void createTopic(final String topic,
-                            final int partitions,
-                            final int replication,
-                            final Map<String, String> topicConfig) throws 
InterruptedException {
-        brokers[0].createTopic(topic, partitions, replication, topicConfig);
-        final List<TopicPartition> topicPartitions = new ArrayList<>();
-        for (int partition = 0; partition < partitions; partition++) {
-            topicPartitions.add(new TopicPartition(topic, partition));
+    public void createTopic(final String topic, final int partitions, final 
int replication, final Map<String, String> topicConfig) {
+        if (replication > cluster.brokers().size()) {
+            throw new InvalidReplicationFactorException("Insufficient brokers 
("
+                    + cluster.brokers().size() + ") for desired replication (" 
+ replication + ")");
         }
-        IntegrationTestUtils.waitForTopicPartitions(brokers(), 
topicPartitions, TOPIC_CREATION_TIMEOUT);
-    }
 
-    /**
-     * Deletes a topic returns immediately.
-     *
-     * @param topic the name of the topic
-     */
-    public void deleteTopic(final String topic) throws InterruptedException {
-        deleteTopicsAndWait(-1L, topic);
+        log.info("Creating topic { name: {}, partitions: {}, replication: {}, 
config: {} }",
+                topic, partitions, replication, topicConfig);
+        final NewTopic newTopic = new NewTopic(topic, partitions, (short) 
replication);
+        newTopic.configs(topicConfig);
+
+        try (final Admin adminClient = createAdminClient()) {
+            
adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
+            TestUtils.waitForCondition(() -> 
adminClient.listTopics().names().get().contains(topic),
+                    "Wait for topic " + topic + " to get created.");
+        } catch (final TopicExistsException ignored) {
+        } catch (final InterruptedException | ExecutionException e) {
+            if (!(e.getCause() instanceof TopicExistsException)) {
+                throw new RuntimeException(e);
+            }
+        }
     }
 
-    /**
-     * Deletes a topic and blocks for max 30 sec until the topic got deleted.
-     *
-     * @param topic the name of the topic
-     */
-    public void deleteTopicAndWait(final String topic) throws 
InterruptedException {
-        deleteTopicsAndWait(TOPIC_DELETION_TIMEOUT, topic);
+    public void deleteTopics(final String... topics) {
+        for (final String topic : topics) {
+            deleteTopic(topic);
+        }
     }
 
+
     /**
-     * Deletes multiple topics returns immediately.
+     * Delete a Kafka topic.
      *
-     * @param topics the name of the topics
+     * @param topic the topic to delete; may not be null
      */
-    public void deleteTopics(final String... topics) throws 
InterruptedException {
-        deleteTopicsAndWait(-1, topics);
+    public void deleteTopic(final String topic) {
+        try (final Admin adminClient = createAdminClient()) {
+            adminClient.deleteTopics(Collections.singleton(topic)).all().get();
+        } catch (final InterruptedException | ExecutionException e) {
+            if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
+                throw new RuntimeException(e);
+            }
+        }
     }
 
     /**
-     * Deletes multiple topics and blocks for max 30 sec until all topics got 
deleted.
-     *
-     * @param topics the name of the topics
+     * Delete all topics except internal topics.
      */
-    public void deleteTopicsAndWait(final String... topics) throws 
InterruptedException {
-        deleteTopicsAndWait(TOPIC_DELETION_TIMEOUT, topics);
+    public void deleteAllTopics() {
+        try (final Admin adminClient = createAdminClient()) {
+            final Set<String> topics = adminClient.listTopics().names().get();
+            adminClient.deleteTopics(topics).all().get();
+        } catch (final UnknownTopicOrPartitionException ignored) {
+        } catch (final ExecutionException | InterruptedException e) {
+            if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
+                throw new RuntimeException(e);
+            }
+        }
     }
 
     /**
-     * Deletes multiple topics and blocks until all topics got deleted.
-     *
-     * @param timeoutMs the max time to wait for the topics to be deleted 
(does not block if {@code <= 0})
-     * @param topics the name of the topics
+     * Produce given key and value to topic partition.
+     * @param topic the topic to produce to; may not be null.
+     * @param partition the topic partition to produce to.
+     * @param key the record key.
+     * @param value the record value.
      */
-    public void deleteTopicsAndWait(final long timeoutMs, final String... 
topics) throws InterruptedException {
-        for (final String topic : topics) {
+    public void produce(final Map<String, Object> producerProps, final String 
topic, final Integer partition, final String key, final String value) {
+        try (KafkaProducer<byte[], byte[]> producer = new 
KafkaProducer<>(producerProps, new ByteArraySerializer(), new 
ByteArraySerializer())) {
+            final ProducerRecord<byte[], byte[]> msg = new 
ProducerRecord<>(topic, partition, key == null ? null : key.getBytes(), value 
== null ? null : value.getBytes());
             try {
-                brokers[0].deleteTopic(topic);
-            } catch (final UnknownTopicOrPartitionException ignored) { }
+                producer.send(msg).get(TimeUnit.SECONDS.toMillis(120), 
TimeUnit.MILLISECONDS);
+                producer.flush();
+            } catch (final Exception e) {
+                throw new KafkaException("Could not produce message: " + msg, 
e);
+            }
         }
+    }
+
+    public Admin createAdminClient() {
+        return Admin.create(mkProperties(clientDefaultConfig()));
+    }
 
-        if (timeoutMs > 0) {
-            TestUtils.waitForCondition(new TopicsDeletedCondition(topics), 
timeoutMs, "Topics not deleted after " + timeoutMs + " milli seconds.");
+    public Map<String, String> clientDefaultConfig() {
+        final Map<String, String> props = new HashMap<>();
+        props.putIfAbsent(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
+        if (sslEnabled()) {
+            props.putIfAbsent(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, 
brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG).toString());
+            props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ((Password) 
brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).value());
+            props.putIfAbsent(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
"SSL");
         }
+        return props;
     }
 
-    /**
-     * Deletes all topics and blocks until all topics got deleted.
-     *
-     * @param timeoutMs the max time to wait for the topics to be deleted 
(does not block if {@code <= 0})
-     */
-    public void deleteAllTopicsAndWait(final long timeoutMs) throws 
InterruptedException {
-        final Set<String> topics = getAllTopicsInCluster();
-        for (final String topic : topics) {
-            try {
-                brokers[0].deleteTopic(topic);
-            } catch (final UnknownTopicOrPartitionException ignored) { }
+    public KafkaConsumer<byte[], byte[]> createConsumer(final Map<String, 
Object> consumerProps) {
+        final Map<String, Object> props = new HashMap<>(clientDefaultConfig());
+        props.putAll(consumerProps);
+
+        props.putIfAbsent(GROUP_ID_CONFIG, UUID.randomUUID().toString());
+        props.putIfAbsent(ENABLE_AUTO_COMMIT_CONFIG, "false");
+        props.putIfAbsent(AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.putIfAbsent(KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        props.putIfAbsent(VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+
+        final KafkaConsumer<byte[], byte[]> consumer;
+        try {
+            consumer = new KafkaConsumer<>(props);
+        } catch (final Throwable t) {
+            throw new KafkaException("Failed to create consumer", t);
         }
+        return consumer;
+    }
+
+    public KafkaConsumer<byte[], byte[]> createConsumerAndSubscribeTo(final 
Map<String, Object> consumerProps, final String... topics) {
+        return createConsumerAndSubscribeTo(consumerProps, null, topics);
+    }
 
-        if (timeoutMs > 0) {
-            TestUtils.waitForCondition(new TopicsDeletedCondition(topics), 
timeoutMs, "Topics not deleted after " + timeoutMs + " milli seconds.");
+    public KafkaConsumer<byte[], byte[]> createConsumerAndSubscribeTo(final 
Map<String, Object> consumerProps, final ConsumerRebalanceListener 
rebalanceListener, final String... topics) {
+        final KafkaConsumer<byte[], byte[]> consumer = 
createConsumer(consumerProps);
+        if (rebalanceListener != null) {
+            consumer.subscribe(Arrays.asList(topics), rebalanceListener);
+        } else {
+            consumer.subscribe(Arrays.asList(topics));
         }
+        return consumer;
+    }
+
+    private void addDefaultBrokerPropsIfAbsent(final Properties brokerConfig) {
+        
brokerConfig.putIfAbsent(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, 2 * 
1024 * 1024L);
+        
brokerConfig.putIfAbsent(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG,
 "0");
+        
brokerConfig.putIfAbsent(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
 "0");
+        
brokerConfig.putIfAbsent(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG,
 "5");
+        
brokerConfig.putIfAbsent(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
 "1");
+        
brokerConfig.putIfAbsent(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG,
 "5");
+        
brokerConfig.putIfAbsent(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
 "1");
+        
brokerConfig.putIfAbsent(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, 
true);
+        brokerConfig.putIfAbsent(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, 
true);
     }
 
     public void waitForRemainingTopics(final long timeoutMs, final String... 
topics) throws InterruptedException {
         TestUtils.waitForCondition(new TopicsRemainingCondition(topics), 
timeoutMs, "Topics are not expected after " + timeoutMs + " milli seconds.");
     }
 
-    private final class TopicsDeletedCondition implements TestCondition {
-        final Set<String> deletedTopics = new HashSet<>();
-
-        private TopicsDeletedCondition(final String... topics) {
-            Collections.addAll(deletedTopics, topics);
-        }
-
-        private TopicsDeletedCondition(final Collection<String> topics) {
-            deletedTopics.addAll(topics);
+    public Set<String> getAllTopicsInCluster() {
+        try (final Admin adminClient = createAdminClient()) {
+            return adminClient.listTopics(new 
ListTopicsOptions().listInternal(true)).names().get();
+        } catch (final InterruptedException | ExecutionException e) {
+            throw new RuntimeException(e);
         }
+    }
 
-        @Override
-        public boolean conditionMet() {
-            final Set<String> allTopics = getAllTopicsInCluster();
-            return !allTopics.removeAll(deletedTopics);
+    public Properties getLogConfig(final String topic) {
+        try (final Admin adminClient = createAdminClient()) {
+            final ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.TOPIC, topic);
+            final Config config = 
adminClient.describeConfigs(Collections.singleton(configResource)).values().get(configResource).get();
+            final Properties properties = new Properties();
+            for (final ConfigEntry configEntry : config.entries()) {
+                if (configEntry.source() == 
ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG) {
+                    properties.put(configEntry.name(), configEntry.value());
+                }
+            }
+            return properties;
+        } catch (final InterruptedException | ExecutionException e) {
+            throw new RuntimeException(e);
         }
     }
 
@@ -353,25 +445,4 @@ public class EmbeddedKafkaCluster {
             return allTopics.equals(remainingTopics);
         }
     }
-
-    private List<KafkaServer> brokers() {
-        final List<KafkaServer> servers = new ArrayList<>();
-        for (final KafkaEmbedded broker : brokers) {
-            servers.add(broker.kafkaServer());
-        }
-        return servers;
-    }
-
-    public Properties getLogConfig(final String topic) {
-        return 
brokers[0].kafkaServer().zkClient().getEntityConfigs(ConfigType.TOPIC, topic);
-    }
-
-    public Set<String> getAllTopicsInCluster() {
-        final scala.collection.Iterator<String> topicsIterator = 
brokers[0].kafkaServer().zkClient().getAllTopicsInCluster(false).iterator();
-        final Set<String> topics = new HashSet<>();
-        while (topicsIterator.hasNext()) {
-            topics.add(topicsIterator.next());
-        }
-        return topics;
-    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 007da4c0789..536e43b715a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -75,6 +75,7 @@ import java.nio.file.Paths;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -104,7 +105,6 @@ import static org.apache.kafka.common.utils.Utils.sleep;
 import static org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout;
 import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -294,7 +294,7 @@ public class IntegrationTestUtils {
                                             final int replicationCount,
                                             final String... topics) {
         try {
-            cluster.deleteAllTopicsAndWait(DEFAULT_TIMEOUT);
+            cluster.deleteAllTopics();
             for (final String topic : topics) {
                 cluster.createTopic(topic, partitionCount, replicationCount);
             }
@@ -306,9 +306,9 @@ public class IntegrationTestUtils {
     public static void quietlyCleanStateAfterTest(final EmbeddedKafkaCluster 
cluster, final KafkaStreams driver) {
         try {
             driver.cleanUp();
-            cluster.deleteAllTopicsAndWait(DEFAULT_TIMEOUT);
-        } catch (final RuntimeException | InterruptedException e) {
-            LOG.warn("Ignoring failure to clean test state", e);
+            cluster.deleteAllTopics();
+        } catch (final RuntimeException e) {
+            LOG.warn("Ignoring failure to clean test state");
         }
     }
 
@@ -1167,6 +1167,10 @@ public class IntegrationTestUtils {
         if (results.size() != expected.size()) {
             throw new AssertionError(printRecords(results) + " != " + 
expected);
         }
+        // sort expected and results by key before comparing them
+        expected.sort(Comparator.comparing(e -> e.key().toString()));
+        results.sort(Comparator.comparing(e -> e.key().toString()));
+
         final Iterator<KeyValueTimestamp<K, V>> expectedIterator = 
expected.iterator();
         for (final ConsumerRecord<K, V> result : results) {
             final KeyValueTimestamp<K, V> expected1 = expectedIterator.next();
@@ -1178,28 +1182,6 @@ public class IntegrationTestUtils {
         }
     }
 
-    public static void verifyKeyValueTimestamps(final Properties 
consumerConfig,
-                                                final String topic,
-                                                final 
Set<KeyValueTimestamp<String, Long>> expected) {
-        final List<ConsumerRecord<String, Long>> results;
-        try {
-            results = waitUntilMinRecordsReceived(consumerConfig, topic, 
expected.size());
-        } catch (final Exception e) {
-            throw new RuntimeException(e);
-        }
-
-        if (results.size() != expected.size()) {
-            throw new AssertionError(printRecords(results) + " != " + 
expected);
-        }
-
-        final Set<KeyValueTimestamp<String, Long>> actual =
-            results.stream()
-                   .map(result -> new KeyValueTimestamp<>(result.key(), 
result.value(), result.timestamp()))
-                   .collect(Collectors.toSet());
-
-        assertThat(actual, equalTo(expected));
-    }
-
     private static <K, V> void compareKeyValueTimestamp(final 
ConsumerRecord<K, V> record,
                                                         final K expectedKey,
                                                         final V expectedValue,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
deleted file mode 100644
index 5937d287e0c..00000000000
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.integration.utils;
-
-import kafka.cluster.EndPoint;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-import kafka.utils.TestUtils;
-
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.admin.AdminClientConfig;
-import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.common.config.SslConfigs;
-import org.apache.kafka.common.config.types.Password;
-import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.network.SocketServerConfigs;
-import org.apache.kafka.server.config.ServerConfigs;
-import org.apache.kafka.server.config.ZkConfigs;
-import org.apache.kafka.server.util.MockTime;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-
-import static 
org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
-import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG;
-import static 
org.apache.kafka.server.config.ServerLogConfigs.NUM_PARTITIONS_CONFIG;
-
-
-/**
- * Runs an in-memory, "embedded" instance of a Kafka broker, which listens at 
`127.0.0.1:9092` by
- * default.
- * <p>
- * Requires a running ZooKeeper instance to connect to.
- */
-public class KafkaEmbedded {
-
-    private static final Logger log = 
LoggerFactory.getLogger(KafkaEmbedded.class);
-
-    private static final String DEFAULT_ZK_CONNECT = "127.0.0.1:2181";
-
-    private final Properties effectiveConfig;
-    private final File logDir;
-    private final File tmpFolder;
-    private final KafkaServer kafka;
-
-    /**
-     * Creates and starts an embedded Kafka broker.
-     *
-     * @param config Broker configuration settings.  Used to modify, for 
example, on which port the
-     *               broker should listen to.  Note that you cannot change the 
`log.dirs` setting
-     *               currently.
-     */
-    @SuppressWarnings({"WeakerAccess", "this-escape"})
-    public KafkaEmbedded(final Properties config, final MockTime time) throws 
IOException {
-        tmpFolder = org.apache.kafka.test.TestUtils.tempDirectory();
-        logDir = 
org.apache.kafka.test.TestUtils.tempDirectory(tmpFolder.toPath(), "log");
-        effectiveConfig = effectiveConfigFrom(config);
-        final boolean loggingEnabled = true;
-        final KafkaConfig kafkaConfig = new KafkaConfig(effectiveConfig, 
loggingEnabled);
-        log.debug("Starting embedded Kafka broker (with log.dirs={} and ZK 
ensemble at {}) ...",
-            logDir, zookeeperConnect());
-        kafka = TestUtils.createServer(kafkaConfig, time);
-        log.debug("Startup of embedded Kafka broker at {} completed (with ZK 
ensemble at {}) ...",
-            brokerList(), zookeeperConnect());
-    }
-
-    /**
-     * Creates the configuration for starting the Kafka broker by merging 
default values with
-     * overwrites.
-     *
-     * @param initialConfig Broker configuration settings that override the 
default config.
-     */
-    private Properties effectiveConfigFrom(final Properties initialConfig) {
-        final Properties effectiveConfig = new Properties();
-        effectiveConfig.put(ServerConfigs.BROKER_ID_CONFIG, 0);
-        effectiveConfig.put(NUM_PARTITIONS_CONFIG, 1);
-        effectiveConfig.put(AUTO_CREATE_TOPICS_ENABLE_CONFIG, true);
-        effectiveConfig.put(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG, 1000000);
-        effectiveConfig.put(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, 
true);
-        effectiveConfig.put(ZkConfigs.ZK_SESSION_TIMEOUT_MS_CONFIG, 10000);
-
-        effectiveConfig.putAll(initialConfig);
-        effectiveConfig.setProperty(LOG_DIR_CONFIG, logDir.getAbsolutePath());
-        return effectiveConfig;
-    }
-
-    /**
-     * This broker's `metadata.broker.list` value.  Example: `localhost:9092`.
-     * <p>
-     * You can use this to tell Kafka producers and consumers how to connect 
to this instance.
-     */
-    @SuppressWarnings("WeakerAccess")
-    public String brokerList() {
-        final EndPoint endPoint = kafka.advertisedListeners().head();
-        return endPoint.host() + ":" + endPoint.port();
-    }
-
-
-    /**
-     * The ZooKeeper connection string aka `zookeeper.connect`.
-     */
-    @SuppressWarnings("WeakerAccess")
-    public String zookeeperConnect() {
-        return effectiveConfig.getProperty("zookeeper.connect", 
DEFAULT_ZK_CONNECT);
-    }
-
-    @SuppressWarnings("WeakerAccess")
-    public void stopAsync() {
-        log.debug("Shutting down embedded Kafka broker at {} (with ZK ensemble 
at {}) ...",
-                  brokerList(), zookeeperConnect());
-        kafka.shutdown();
-    }
-
-    @SuppressWarnings("WeakerAccess")
-    public void awaitStoppedAndPurge() {
-        kafka.awaitShutdown();
-        log.debug("Removing log dir at {} ...", logDir);
-        try {
-            Utils.delete(tmpFolder);
-        } catch (final IOException e) {
-            throw new RuntimeException(e);
-        }
-        log.debug("Shutdown of embedded Kafka broker at {} completed (with ZK 
ensemble at {}) ...",
-            brokerList(), zookeeperConnect());
-    }
-
-    /**
-     * Create a Kafka topic with 1 partition and a replication factor of 1.
-     *
-     * @param topic The name of the topic.
-     */
-    public void createTopic(final String topic) {
-        createTopic(topic, 1, 1, Collections.emptyMap());
-    }
-
-    /**
-     * Create a Kafka topic with the given parameters.
-     *
-     * @param topic       The name of the topic.
-     * @param partitions  The number of partitions for this topic.
-     * @param replication The replication factor for (the partitions of) this 
topic.
-     */
-    public void createTopic(final String topic, final int partitions, final 
int replication) {
-        createTopic(topic, partitions, replication, Collections.emptyMap());
-    }
-
-    /**
-     * Create a Kafka topic with the given parameters.
-     *
-     * @param topic       The name of the topic.
-     * @param partitions  The number of partitions for this topic.
-     * @param replication The replication factor for (partitions of) this 
topic.
-     * @param topicConfig Additional topic-level configuration settings.
-     */
-    public void createTopic(final String topic,
-                            final int partitions,
-                            final int replication,
-                            final Map<String, String> topicConfig) {
-        log.debug("Creating topic { name: {}, partitions: {}, replication: {}, 
config: {} }",
-            topic, partitions, replication, topicConfig);
-        final NewTopic newTopic = new NewTopic(topic, partitions, (short) 
replication);
-        newTopic.configs(topicConfig);
-
-        try (final Admin adminClient = createAdminClient()) {
-            
adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
-        } catch (final InterruptedException | ExecutionException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @SuppressWarnings("WeakerAccess")
-    public Admin createAdminClient() {
-        final Properties adminClientConfig = new Properties();
-        adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
brokerList());
-        final Object listeners = 
effectiveConfig.get(SocketServerConfigs.LISTENERS_CONFIG);
-        if (listeners != null && listeners.toString().contains("SSL")) {
-            adminClientConfig.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, 
effectiveConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
-            adminClientConfig.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, 
((Password) 
effectiveConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).value());
-            
adminClientConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
-        }
-        return Admin.create(adminClientConfig);
-    }
-
-    @SuppressWarnings("WeakerAccess")
-    public void deleteTopic(final String topic) {
-        log.debug("Deleting topic { name: {} }", topic);
-        try (final Admin adminClient = createAdminClient()) {
-            
adminClient.deleteTopics(Collections.singletonList(topic)).all().get();
-        } catch (final InterruptedException | ExecutionException e) {
-            if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
-                throw new RuntimeException(e);
-            }
-        }
-    }
-
-    @SuppressWarnings("WeakerAccess")
-    public KafkaServer kafkaServer() {
-        return kafka;
-    }
-}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/HandlingSourceTopicDeletionIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/HandlingSourceTopicDeletionIntegrationTest.java
index 56218c72900..1aeaa45d92c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/HandlingSourceTopicDeletionIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/HandlingSourceTopicDeletionIntegrationTest.java
@@ -114,7 +114,7 @@ public class HandlingSourceTopicDeletionIntegrationTest {
             () -> "Kafka Streams clients did not reach state RUNNING"
         );
 
-        CLUSTER.deleteTopicAndWait(INPUT_TOPIC);
+        CLUSTER.deleteTopic(INPUT_TOPIC);
 
         TestUtils.waitForCondition(
             () -> kafkaStreams1.state() == State.ERROR && 
kafkaStreams2.state() == State.ERROR,

Reply via email to