This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 61f61d62403 KAFKA-14569: Migrate Connect's integration test 
EmbeddedKafkaCluster from ZK to KRaft mode (#16599)
61f61d62403 is described below

commit 61f61d62403e6b547a3a333895395ceff6d7afe5
Author: Chris Egerton <[email protected]>
AuthorDate: Mon Jul 29 16:43:55 2024 +0200

    KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from 
ZK to KRaft mode (#16599)
    
    Reviewers: Omnia Ibrahim <[email protected]>, Mickael Maison 
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
 build.gradle                                       |   5 +-
 checkstyle/import-control.xml                      |   3 +
 .../MirrorConnectorsIntegrationBaseTest.java       |  93 ++++--
 .../MirrorConnectorsIntegrationSSLTest.java        |  12 +-
 ...rsWithCustomForwardingAdminIntegrationTest.java |  42 ++-
 .../connect/integration/BlockingConnectorTest.java |   4 +-
 .../integration/ConnectWorkerIntegrationTest.java  |  50 +++-
 .../ConnectorClientPolicyIntegrationTest.java      |   2 +-
 .../ConnectorRestartApiIntegrationTest.java        |   9 +-
 .../ConnectorTopicsIntegrationTest.java            |   4 +-
 .../ConnectorValidationIntegrationTest.java        |   2 -
 .../ExactlyOnceSourceIntegrationTest.java          |  93 ++++--
 .../integration/ExampleConnectIntegrationTest.java |   4 +-
 .../integration/InternalTopicsIntegrationTest.java |   2 +-
 .../integration/OffsetsApiIntegrationTest.java     |   1 -
 .../RebalanceSourceConnectorsIntegrationTest.java  |   4 +-
 .../integration/RestExtensionIntegrationTest.java  |   4 +-
 .../SessionedProtocolIntegrationTest.java          |   4 +-
 .../integration/SinkConnectorsIntegrationTest.java |   4 +-
 .../SourceConnectorsIntegrationTest.java           |   4 +-
 .../integration/TransformationIntegrationTest.java |   4 +-
 .../connect/util/clusters/EmbeddedConnect.java     |   4 +-
 .../util/clusters/EmbeddedConnectCluster.java      |   8 +-
 .../util/clusters/EmbeddedConnectStandalone.java   |   2 +-
 .../util/clusters/EmbeddedKafkaCluster.java        | 325 +++++++--------------
 .../java/kafka/testkit/KafkaClusterTestKit.java    |  29 +-
 26 files changed, 371 insertions(+), 347 deletions(-)

diff --git a/build.gradle b/build.gradle
index 79f61289bbd..7028c409f83 100644
--- a/build.gradle
+++ b/build.gradle
@@ -3193,13 +3193,14 @@ project(':connect:runtime') {
     testImplementation project(':core')
     testImplementation project(':server')
     testImplementation project(':metadata')
+    testImplementation project(':server-common')
     testImplementation project(':core').sourceSets.test.output
     testImplementation project(':server-common')
     testImplementation project(':server')
     testImplementation project(':group-coordinator')
     testImplementation project(':storage')
     testImplementation project(':connect:test-plugins')
-    testImplementation project(':group-coordinator')
+    testImplementation project(':server-common').sourceSets.test.output
 
     testImplementation libs.junitJupiter
     testImplementation libs.mockitoCore
@@ -3317,6 +3318,7 @@ project(':connect:file') {
     testImplementation project(':connect:runtime').sourceSets.test.output
     testImplementation project(':core')
     testImplementation project(':core').sourceSets.test.output
+    testImplementation project(':server-common').sourceSets.test.output
   }
 
   javadoc {
@@ -3418,6 +3420,7 @@ project(':connect:mirror') {
     testImplementation project(':core')
     testImplementation project(':core').sourceSets.test.output
     testImplementation project(':server')
+    testImplementation project(':server-common').sourceSets.test.output
 
     testRuntimeOnly project(':connect:runtime')
     testRuntimeOnly libs.slf4jReload4j
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index c6ef445c044..08c45d0aa38 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -592,6 +592,7 @@
         <allow pkg="org.apache.kafka.server.config" />
         <allow pkg="kafka.cluster" />
         <allow pkg="kafka.server" />
+        <allow pkg="kafka.testkit" />
         <allow pkg="kafka.zk" />
         <allow pkg="kafka.utils" />
         <allow class="javax.servlet.http.HttpServletResponse" />
@@ -612,6 +613,8 @@
       <allow pkg="org.eclipse.jetty.util"/>
       <!-- for tests -->
       <allow pkg="org.apache.kafka.server.util" />
+      <allow pkg="org.apache.kafka.server.config" />
+      <allow pkg="kafka.server"/>
     </subpackage>
 
     <subpackage name="json">
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
index 3b6f52ca191..80f8d2c3bda 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
@@ -31,7 +31,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.config.TopicConfig;
@@ -56,6 +56,7 @@ import 
org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
 import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
 import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
 import org.apache.kafka.connect.util.clusters.UngracefulShutdownException;
+import org.apache.kafka.test.TestCondition;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -78,7 +79,6 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -86,6 +86,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.function.LongUnaryOperator;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
 import static 
org.apache.kafka.connect.mirror.MirrorConnectorConfig.OFFSET_SYNCS_CLIENT_ROLE_PREFIX;
@@ -301,12 +302,30 @@ public class MirrorConnectorsIntegrationBaseTest {
         MirrorClient primaryClient = new 
MirrorClient(mm2Config.clientConfig(PRIMARY_CLUSTER_ALIAS));
         MirrorClient backupClient = new 
MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS));
 
-        // make sure the topic is auto-created in the other cluster
-        waitForTopicCreated(primary, reverseTopic1);
-        waitForTopicCreated(backup, backupTopic1);
         waitForTopicCreated(primary, "mm2-offset-syncs.backup.internal");
-        assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, 
getTopicConfig(backup.kafka(), backupTopic1, TopicConfig.CLEANUP_POLICY_CONFIG),
-                "topic config was not synced");
+
+        TestCondition assertBackupTopicConfig = () -> {
+            String compactPolicy = getTopicConfig(backup.kafka(), 
backupTopic1, TopicConfig.CLEANUP_POLICY_CONFIG);
+            assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, compactPolicy, 
"topic config was not synced");
+            return true;
+        };
+
+        if (replicateBackupToPrimary) {
+            // make sure the topics are auto-created in the other cluster
+            waitForTopicCreated(primary, reverseTopic1);
+            waitForTopicCreated(backup, backupTopic1);
+            assertBackupTopicConfig.conditionMet();
+        } else {
+            // The backup and reverse topics are identical to the topics we 
created while setting up the test;
+            // we don't have to wait for them to be created, but there might 
be a small delay between
+            // now and when MM2 is able to sync the config for the backup topic
+            waitForCondition(
+                    assertBackupTopicConfig,
+                    10_000, // Topic config sync interval is one second; this 
should be plenty of time
+                    "topic config was not synced in time"
+            );
+        }
+
         createAndTestNewTopicWithConfigFilter();
 
         assertEquals(NUM_RECORDS_PRODUCED, 
primary.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, 
"test-topic-1").count(),
@@ -429,12 +448,12 @@ public class MirrorConnectorsIntegrationBaseTest {
         try (Consumer<byte[], byte[]> primaryConsumer = 
primary.kafka().createConsumerAndSubscribeTo(consumerProps, topic)) {
             waitForConsumingAllRecords(primaryConsumer, expectedRecords);
         }
-        
+
         // one way replication from primary to backup
         mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + 
".enabled", "false");
         mm2Config = new MirrorMakerConfig(mm2Props);
         waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, 
PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
-        
+
         // sleep few seconds to have MM2 finish replication so that "end" 
consumer will consume some record
         Thread.sleep(TimeUnit.SECONDS.toMillis(3));
 
@@ -445,7 +464,7 @@ public class MirrorConnectorsIntegrationBaseTest {
                 backupTopic)) {
             waitForConsumingAllRecords(backupConsumer, expectedRecords);
         }
-        
+
         try (Admin backupClient = backup.kafka().createAdminClient()) {
             // retrieve the consumer group offset from backup cluster
             Map<TopicPartition, OffsetAndMetadata> remoteOffsets =
@@ -1189,14 +1208,11 @@ public class MirrorConnectorsIntegrationBaseTest {
      * @param records Records to send in one parallel batch
      */
     protected void produceMessages(Producer<byte[], byte[]> producer, 
List<ProducerRecord<byte[], byte[]>> records) {
-        List<Future<RecordMetadata>> futures = new ArrayList<>();
-        for (ProducerRecord<byte[], byte[]> record : records) {
-            futures.add(producer.send(record));
-        }
         Timer timer = Time.SYSTEM.timer(RECORD_PRODUCE_DURATION_MS);
+
         try {
-            for (Future<RecordMetadata> future : futures) {
-                future.get(timer.remainingMs(), TimeUnit.MILLISECONDS);
+            for (ProducerRecord<byte[], byte[]> record : records) {
+                producer.send(record).get(timer.remainingMs(), 
TimeUnit.MILLISECONDS);
                 timer.update();
             }
         } catch (ExecutionException | InterruptedException | TimeoutException 
e) {
@@ -1397,13 +1413,46 @@ public class MirrorConnectorsIntegrationBaseTest {
     /*
      * Generate some consumer activity on both clusters to ensure the 
checkpoint connector always starts promptly
      */
-    protected void warmUpConsumer(Map<String, Object> consumerProps) {
-        try (Consumer<byte[], byte[]> dummyConsumer = 
primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
-            dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
-            dummyConsumer.commitSync();
-        }
-        try (Consumer<byte[], byte[]> dummyConsumer = 
backup.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
+    protected final void warmUpConsumer(Map<String, Object> consumerProps) {
+        final String topic = "test-topic-1";
+        warmUpConsumer("primary", primary.kafka(), consumerProps, topic);
+        warmUpConsumer("backup", backup.kafka(), consumerProps, topic);
+    }
+
+    private void warmUpConsumer(String clusterName, EmbeddedKafkaCluster 
kafkaCluster, Map<String, Object> consumerProps, String topic) {
+        try (Consumer<?, ?> dummyConsumer = 
kafkaCluster.createConsumerAndSubscribeTo(consumerProps, topic)) {
+            // poll to ensure we've joined the group
             dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+
+            // force the consumer to have a known position on every topic 
partition
+            // so that it will be able to commit offsets for that position
+            // (it's possible that poll returns before that has happened)
+            Set<TopicPartition> topicPartitionsPendingPosition = 
IntStream.range(0, NUM_PARTITIONS)
+                    .mapToObj(partition -> new TopicPartition(topic, 
partition))
+                    .collect(Collectors.toSet());
+            Timer positionTimer = Time.SYSTEM.timer(60_000);
+            while (!positionTimer.isExpired() && 
!topicPartitionsPendingPosition.isEmpty()) {
+                Set<TopicPartition> topicPartitionsWithPosition = new 
HashSet<>();
+
+                topicPartitionsPendingPosition.forEach(topicPartition -> {
+                    try {
+                        positionTimer.update();
+                        dummyConsumer.position(topicPartition, 
Duration.ofMillis(positionTimer.remainingMs()));
+                        topicPartitionsWithPosition.add(topicPartition);
+                    } catch (KafkaException e) {
+                        log.warn("Failed to calculate consumer position for {} 
on cluster {}", topicPartition, clusterName);
+                    }
+                });
+
+                
topicPartitionsPendingPosition.removeAll(topicPartitionsWithPosition);
+            }
+            assertEquals(
+                    Collections.emptySet(),
+                    topicPartitionsPendingPosition,
+                    "Failed to calculate consumer position for one or more 
partitions on cluster " + clusterName + " in time"
+            );
+
+            // And finally, commit offsets
             dummyConsumer.commitSync();
         }
     }
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationSSLTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationSSLTest.java
index 7999f5f9a4a..96a71710d0d 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationSSLTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationSSLTest.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.common.network.ConnectionMode;
 import org.apache.kafka.network.SocketServerConfigs;
-import org.apache.kafka.server.config.ReplicationConfigs;
 import org.apache.kafka.test.TestSslUtils;
 import org.apache.kafka.test.TestUtils;
 
@@ -42,25 +41,24 @@ public class MirrorConnectorsIntegrationSSLTest extends 
MirrorConnectorsIntegrat
     public void startClusters() throws Exception {
         Map<String, Object> sslConfig = TestSslUtils.createSslConfig(false, 
true, ConnectionMode.SERVER, TestUtils.tempFile(), "testCert");
         // enable SSL on backup kafka broker
-        backupBrokerProps.put(SocketServerConfigs.LISTENERS_CONFIG, 
"SSL://localhost:0");
-        
backupBrokerProps.put(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, 
"SSL");
+        
backupBrokerProps.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
 "EXTERNAL:SSL,CONTROLLER:SSL");
         backupBrokerProps.putAll(sslConfig);
-        
+
         Properties sslProps = new Properties();
         sslProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, 
sslConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
         sslProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ((Password) 
sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).value());
         sslProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
-        
+
         // set SSL config for kafka connect worker
         
backupWorkerProps.putAll(sslProps.entrySet().stream().collect(Collectors.toMap(
             e -> String.valueOf(e.getKey()), e ->  
String.valueOf(e.getValue()))));
-        
+
         mm2Props.putAll(sslProps.entrySet().stream().collect(Collectors.toMap(
             e -> BACKUP_CLUSTER_ALIAS + "." + e.getKey(), e ->  
String.valueOf(e.getValue()))));
         // set SSL config for producer used by source task in MM2
         mm2Props.putAll(sslProps.entrySet().stream().collect(Collectors.toMap(
             e -> BACKUP_CLUSTER_ALIAS + ".producer." + e.getKey(), e ->  
String.valueOf(e.getValue()))));
-        
+
         super.startClusters();
     }
 }
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java
index d94ce632ae1..853cd02f134 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.acl.AclBindingFilter;
 import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.acl.AclPermissionType;
 import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.resource.PatternType;
 import org.apache.kafka.common.resource.ResourcePattern;
 import org.apache.kafka.common.resource.ResourcePatternFilter;
@@ -33,6 +34,9 @@ import org.apache.kafka.connect.mirror.MirrorMakerConfig;
 import 
org.apache.kafka.connect.mirror.clients.admin.FakeForwardingAdminWithLocalMetadata;
 import org.apache.kafka.connect.mirror.clients.admin.FakeLocalMetadataStore;
 import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.apache.kafka.network.SocketServerConfigs;
+import org.apache.kafka.server.config.KRaftConfigs;
+import org.apache.kafka.server.config.ServerConfigs;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -62,23 +66,26 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
  */
 @Tag("integration")
 public class MirrorConnectorsWithCustomForwardingAdminIntegrationTest extends 
MirrorConnectorsIntegrationBaseTest {
+
+    private static final int TOPIC_ACL_SYNC_DURATION_MS = 30_000;
     private static final int FAKE_LOCAL_METADATA_STORE_SYNC_DURATION_MS = 
60_000;
 
     /*
      * enable ACL on brokers.
      */
     protected static void enableAclAuthorizer(Properties brokerProps) {
-        brokerProps.put("authorizer.class.name", 
"kafka.security.authorizer.AclAuthorizer");
-        brokerProps.put("sasl.enabled.mechanisms", "PLAIN");
-        brokerProps.put("sasl.mechanism.inter.broker.protocol", "PLAIN");
-        brokerProps.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
-        brokerProps.put("listeners", "SASL_PLAINTEXT://localhost:0");
-        brokerProps.put("listener.name.sasl_plaintext.plain.sasl.jaas.config",
-                "org.apache.kafka.common.security.plain.PlainLoginModule 
required "
-                        + "username=\"super\" "
-                        + "password=\"super_pwd\" "
-                        + "user_connector=\"connector_pwd\" "
-                        + "user_super=\"super_pwd\";");
+        
brokerProps.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, 
"CONTROLLER:SASL_PLAINTEXT,EXTERNAL:SASL_PLAINTEXT");
+        brokerProps.put(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, 
"org.apache.kafka.metadata.authorizer.StandardAuthorizer");
+        brokerProps.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, 
"PLAIN");
+        
brokerProps.put(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG,
 "PLAIN");
+        
brokerProps.put(KRaftConfigs.SASL_MECHANISM_CONTROLLER_PROTOCOL_CONFIG, 
"PLAIN");
+        String listenerSaslJaasConfig = 
"org.apache.kafka.common.security.plain.PlainLoginModule required "
+                + "username=\"super\" "
+                + "password=\"super_pwd\" "
+                + "user_connector=\"connector_pwd\" "
+                + "user_super=\"super_pwd\";";
+        brokerProps.put("listener.name.external.plain.sasl.jaas.config", 
listenerSaslJaasConfig);
+        brokerProps.put("listener.name.controller.plain.sasl.jaas.config", 
listenerSaslJaasConfig);
         brokerProps.put("super.users", "User:super");
     }
 
@@ -293,6 +300,7 @@ public class 
MirrorConnectorsWithCustomForwardingAdminIntegrationTest extends Mi
     @Test
     public void testSyncTopicACLsUseProvidedForwardingAdmin() throws Exception 
{
         mm2Props.put("sync.topic.acls.enabled", "true");
+        mm2Props.put("sync.topic.acls.interval.seconds", "1");
         mm2Config = new MirrorMakerConfig(mm2Props);
         List<AclBinding> aclBindings = Collections.singletonList(
                 new AclBinding(
@@ -324,8 +332,16 @@ public class 
MirrorConnectorsWithCustomForwardingAdminIntegrationTest extends Mi
                 new AccessControlEntry("User:dummy", "*", 
AclOperation.DESCRIBE, AclPermissionType.ALLOW)
         );
 
-        assertTrue(getAclBindings(backup.kafka(), 
"primary.test-topic-1").contains(expectedACLOnBackupCluster), "topic ACLs was 
synced");
-        assertTrue(getAclBindings(primary.kafka(), 
"backup.test-topic-1").contains(expectedACLOnPrimaryCluster), "topic ACLs was 
synced");
+        // In some rare cases replica topics are created before ACLs are 
synced, so retry logic is necessary
+        waitForCondition(
+                () -> {
+                    assertTrue(getAclBindings(backup.kafka(), 
"primary.test-topic-1").contains(expectedACLOnBackupCluster), "topic ACLs are 
not synced on backup cluster");
+                    assertTrue(getAclBindings(primary.kafka(), 
"backup.test-topic-1").contains(expectedACLOnPrimaryCluster), "topic ACLs are 
not synced on primary cluster");
+                    return true;
+                },
+                TOPIC_ACL_SYNC_DURATION_MS,
+                "Topic ACLs were not synced in time"
+        );
 
         // expect to use FakeForwardingAdminWithLocalMetadata to update topic 
ACLs in FakeLocalMetadataStore.allAcls
         
assertTrue(FakeLocalMetadataStore.aclBindings("dummy").containsAll(Arrays.asList(expectedACLOnBackupCluster,
 expectedACLOnPrimaryCluster)));
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
index ab5cf207fe8..90a0e96a78a 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
@@ -123,7 +123,7 @@ public class BlockingConnectorTest {
 
     @BeforeEach
     public void setup() throws Exception {
-        // build a Connect cluster backed by Kafka and Zk
+        // build a Connect cluster backed by a Kafka KRaft cluster
         connect = new EmbeddedConnectCluster.Builder()
                 .name("connect-cluster")
                 .numWorkers(NUM_WORKERS)
@@ -138,7 +138,7 @@ public class BlockingConnectorTest {
 
     @AfterEach
     public void close() {
-        // stop all Connect, Kafka and Zk threads.
+        // stop the Connect cluster and its backing Kafka cluster.
         connect.stop();
         // unblock everything so that we don't leak threads after each test run
         Block.reset();
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
index 96753b2100e..516d5a3f2cd 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
@@ -43,6 +43,7 @@ import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.SinkUtils;
 import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
 import org.apache.kafka.connect.util.clusters.WorkerHandle;
+import org.apache.kafka.network.SocketServerConfigs;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.AfterEach;
@@ -57,6 +58,8 @@ import org.slf4j.event.Level;
 
 import java.io.File;
 import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.ServerSocket;
 import java.nio.file.Path;
 import java.util.Collection;
 import java.util.Collections;
@@ -118,6 +121,7 @@ public class ConnectWorkerIntegrationTest {
     private static final Logger log = 
LoggerFactory.getLogger(ConnectWorkerIntegrationTest.class);
 
     private static final int NUM_TOPIC_PARTITIONS = 3;
+    private static final long RECORD_TRANSFER_TIMEOUT_MS = 
TimeUnit.SECONDS.toMillis(60);
     private static final long OFFSET_COMMIT_INTERVAL_MS = 
TimeUnit.SECONDS.toMillis(30);
     private static final int NUM_WORKERS = 3;
     private static final int NUM_TASKS = 4;
@@ -142,7 +146,7 @@ public class ConnectWorkerIntegrationTest {
         brokerProps = new Properties();
         brokerProps.put("auto.create.topics.enable", String.valueOf(false));
 
-        // build a Connect cluster backed by Kafka and Zk
+        // build a Connect cluster backed by a Kafka KRaft cluster
         connectBuilder = new EmbeddedConnectCluster.Builder()
                 .name("connect-cluster")
                 .numWorkers(NUM_WORKERS)
@@ -154,7 +158,7 @@ public class ConnectWorkerIntegrationTest {
     @AfterEach
     public void close(TestInfo testInfo) {
         log.info("Finished test {}", testInfo.getDisplayName());
-        // stop all Connect, Kafka and Zk threads.
+        // stop the Connect cluster and its backing Kafka cluster.
         connect.stop();
     }
 
@@ -244,8 +248,11 @@ public class ConnectWorkerIntegrationTest {
     public void testBrokerCoordinator() throws Exception {
         ConnectorHandle connectorHandle = 
RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
         
workerProps.put(DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, 
String.valueOf(5000));
-        connect = connectBuilder.workerProps(workerProps).build();
+
+        useFixedBrokerPort();
+
         // start the clusters
+        connect = connectBuilder.build();
         connect.start();
         int numTasks = 4;
         // create test topic
@@ -263,7 +270,7 @@ public class ConnectWorkerIntegrationTest {
         // expect that the connector will be stopped once the coordinator is 
detected to be down
         StartAndStopLatch stopLatch = connectorHandle.expectedStops(1, false);
 
-        connect.kafka().stopOnlyKafka();
+        connect.kafka().stopOnlyBrokers();
 
         // Allow for the workers to discover that the coordinator is 
unavailable, wait is
         // heartbeat timeout * 2 + 4sec
@@ -294,7 +301,7 @@ public class ConnectWorkerIntegrationTest {
                         + CONNECTOR_SETUP_DURATION_MS + "ms");
 
         StartAndStopLatch startLatch = connectorHandle.expectedStarts(1, 
false);
-        connect.kafka().startOnlyKafkaOnSamePorts();
+        connect.kafka().restartOnlyBrokers();
 
         // Allow for the kafka brokers to come back online
         Thread.sleep(TimeUnit.SECONDS.toMillis(10));
@@ -835,8 +842,10 @@ public class ConnectWorkerIntegrationTest {
         // Workaround for KAFKA-15676, which can cause the scheduled rebalance 
delay to
         // be spuriously triggered after the group coordinator for a Connect 
cluster is bounced
         workerProps.put(SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, "0");
+
+        useFixedBrokerPort();
+
         connect = connectBuilder
-                .numBrokers(1)
                 .numWorkers(1)
                 .build();
         connect.start();
@@ -854,7 +863,7 @@ public class ConnectWorkerIntegrationTest {
 
         // Bring down Kafka, which should cause some REST requests to fail
         log.info("Stopping Kafka cluster");
-        connect.kafka().stopOnlyKafka();
+        connect.kafka().stopOnlyBrokers();
 
         // Try to reconfigure the connector, which should fail with a timeout 
error
         log.info("Trying to reconfigure connector while Kafka cluster is 
down");
@@ -863,7 +872,7 @@ public class ConnectWorkerIntegrationTest {
                 "flushing updates to the status topic"
         );
         log.info("Restarting Kafka cluster");
-        connect.kafka().startOnlyKafkaOnSamePorts();
+        connect.kafka().restartOnlyBrokers();
         connect.assertions().assertExactlyNumBrokersAreUp(1, "Broker did not 
complete startup in time");
         log.info("Kafka cluster is restarted");
 
@@ -1182,7 +1191,7 @@ public class ConnectWorkerIntegrationTest {
                 NUM_TASKS,
                 "Connector or its tasks did not start in time"
         );
-        connectorHandle.awaitCommits(offsetCommitIntervalMs * 3);
+        connectorHandle.awaitCommits(RECORD_TRANSFER_TIMEOUT_MS);
 
         connect.deleteConnector(CONNECTOR_NAME);
 
@@ -1223,7 +1232,7 @@ public class ConnectWorkerIntegrationTest {
                 NUM_TASKS,
                 "Connector or its tasks did not start in time"
         );
-        connectorHandle.awaitCommits(offsetCommitIntervalMs * 3);
+        connectorHandle.awaitCommits(RECORD_TRANSFER_TIMEOUT_MS);
 
         // See if any new records got written to the old topic
         final long nextEndOffset = 
connect.kafka().endOffset(connectorTopicPartition);
@@ -1282,7 +1291,7 @@ public class ConnectWorkerIntegrationTest {
                 NUM_TASKS,
                 "Connector or its tasks did not start in time"
         );
-        connectorHandle.awaitCommits(offsetCommitIntervalMs * 3);
+        connectorHandle.awaitCommits(RECORD_TRANSFER_TIMEOUT_MS);
 
         // Delete the secrets file, which should render the old task configs 
invalid
         assertTrue(secretsFile.delete(), "Failed to delete secrets file");
@@ -1307,7 +1316,7 @@ public class ConnectWorkerIntegrationTest {
 
         // Wait for at least one task to commit offsets after being restarted
         connectorHandle.expectedCommits(1);
-        connectorHandle.awaitCommits(offsetCommitIntervalMs * 3);
+        connectorHandle.awaitCommits(RECORD_TRANSFER_TIMEOUT_MS);
 
         final long endOffset = connect.kafka().endOffset(new 
TopicPartition(secondConnectorTopic, 0));
         assertTrue(
@@ -1446,6 +1455,23 @@ public class ConnectWorkerIntegrationTest {
         return props;
     }
 
+    private void useFixedBrokerPort() throws IOException {
+        // Find a free port and use it in the Kafka broker's listeners config. 
We can't use port 0 in the listeners
+        // config to get a random free port because in this test we want to 
stop the Kafka broker and then bring it
+        // back up and listening on the same port in order to verify that the 
Connect cluster can re-connect to Kafka
+        // and continue functioning normally. If we were to use port 0 here, 
the Kafka broker would most likely listen
+        // on a different random free port the second time it is started. Note 
that we can only use the static port
+        // because we have a single broker setup in this test.
+        int listenerPort;
+        try (ServerSocket s = new ServerSocket(0)) {
+            listenerPort = s.getLocalPort();
+        }
+        brokerProps.put(SocketServerConfigs.LISTENERS_CONFIG, 
String.format("EXTERNAL://localhost:%d,CONTROLLER://localhost:0", 
listenerPort));
+        connectBuilder
+                .numBrokers(1)
+                .brokerProps(brokerProps);
+    }
+
     public static class EmptyTaskConfigsConnector extends SinkConnector {
         @Override
         public String version() {
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java
index 03cd000a19f..a127f85b12d 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java
@@ -110,7 +110,7 @@ public class ConnectorClientPolicyIntegrationTest {
         Properties exampleBrokerProps = new Properties();
         exampleBrokerProps.put("auto.create.topics.enable", "false");
 
-        // build a Connect cluster backed by Kafka and Zk
+        // build a Connect cluster backed by a Kafka KRaft cluster
         EmbeddedConnectCluster connect = new EmbeddedConnectCluster.Builder()
             .name("connect-cluster")
             .numWorkers(NUM_WORKERS)
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java
index c0c64c505dc..1c398a22396 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java
@@ -119,7 +119,6 @@ public class ConnectorRestartApiIntegrationTest {
 
     @AfterAll
     public static void close() {
-        // stop all Connect, Kafka and Zk threads.
         CONNECT_CLUSTERS.values().forEach(EmbeddedConnectCluster::stop);
     }
 
@@ -127,7 +126,7 @@ public class ConnectorRestartApiIntegrationTest {
     public void testRestartUnknownConnectorNoParams() throws Exception {
         String connectorName = "Unknown";
 
-        // build a Connect cluster backed by Kafka and Zk
+        // build a Connect cluster backed by a Kafka KRaft cluster
         startOrReuseConnectWithNumWorkers(ONE_WORKER);
         // Call the Restart API
         String restartEndpoint = connect.endpointForResource(
@@ -148,7 +147,7 @@ public class ConnectorRestartApiIntegrationTest {
     private void restartUnknownConnector(boolean onlyFailed, boolean 
includeTasks) throws Exception {
         String connectorName = "Unknown";
 
-        // build a Connect cluster backed by Kafka and Zk
+        // build a Connect cluster backed by a Kafka KRaft cluster
         startOrReuseConnectWithNumWorkers(ONE_WORKER);
         // Call the Restart API
         String restartEndpoint = connect.endpointForResource(
@@ -299,7 +298,7 @@ public class ConnectorRestartApiIntegrationTest {
         // setup up props for the source connector
         Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
         props.put("connector.start.inject.error", "true");
-        // build a Connect cluster backed by Kafka and Zk
+        // build a Connect cluster backed by a Kafka KRaft cluster
         startOrReuseConnectWithNumWorkers(ONE_WORKER);
 
         // Try to start the connector and its single task.
@@ -330,7 +329,7 @@ public class ConnectorRestartApiIntegrationTest {
         // setup up props for the source connector
         Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
         tasksToFail.forEach(taskId -> props.put("task-" + taskId + 
".start.inject.error", "true"));
-        // build a Connect cluster backed by Kafka and Zk
+        // build a Connect cluster backed by a Kafka KRaft cluster
         startOrReuseConnectWithNumWorkers(ONE_WORKER);
 
         // Try to start the connector and its single task.
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java
index cdc0e3f3d66..fb4bbcdf408 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java
@@ -90,7 +90,7 @@ public class ConnectorTopicsIntegrationTest {
         // setup Kafka broker properties
         brokerProps.put("auto.create.topics.enable", String.valueOf(false));
 
-        // build a Connect cluster backed by Kafka and Zk
+        // build a Connect cluster backed by a Kafka KRaft cluster
         connectBuilder = new EmbeddedConnectCluster.Builder()
                 .name("connect-cluster")
                 .numWorkers(NUM_WORKERS)
@@ -101,7 +101,7 @@ public class ConnectorTopicsIntegrationTest {
 
     @AfterEach
     public void close() {
-        // stop all Connect, Kafka and Zk threads.
+        // stop the Connect cluster and its backing Kafka cluster.
         connect.stop();
     }
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java
index a82238600cb..2805504e360 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java
@@ -80,7 +80,6 @@ public class ConnectorValidationIntegrationTest {
                 TestPlugins.pluginPathJoined(testPlugins)
         );
 
-        // build a Connect cluster backed by Kafka and Zk
         connect = new EmbeddedConnectCluster.Builder()
                 .name("connector-validation-connect-cluster")
                 .workerProps(workerProps)
@@ -93,7 +92,6 @@ public class ConnectorValidationIntegrationTest {
     @AfterAll
     public static void close() {
         if (connect != null) {
-            // stop all Connect, Kafka and Zk threads.
             Utils.closeQuietly(connect::stop, "Embedded Connect cluster");
         }
     }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
index 55a865d43cb..6bb12e8f178 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.acl.AclPermissionType;
 import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.resource.PatternType;
 import org.apache.kafka.common.resource.ResourcePattern;
@@ -42,6 +43,7 @@ import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
 import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
 import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.connect.source.SourceRecord;
@@ -50,6 +52,10 @@ import org.apache.kafka.connect.storage.StringConverter;
 import org.apache.kafka.connect.util.clusters.ConnectAssertions;
 import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
 import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.apache.kafka.network.SocketServerConfigs;
+import org.apache.kafka.server.config.KRaftConfigs;
+import org.apache.kafka.server.config.ServerConfigs;
+import org.apache.kafka.test.NoRetryException;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -116,6 +122,7 @@ public class ExactlyOnceSourceIntegrationTest {
 
     private static final int CONSUME_RECORDS_TIMEOUT_MS = 60_000;
     private static final int SOURCE_TASK_PRODUCE_TIMEOUT_MS = 30_000;
+    private static final int ACL_PROPAGATION_TIMEOUT_MS = 30_000;
     private static final int DEFAULT_NUM_WORKERS = 3;
 
     // Tests require that a minimum but not unreasonably large number of 
records are sourced.
@@ -140,7 +147,7 @@ public class ExactlyOnceSourceIntegrationTest {
         brokerProps.put("transaction.state.log.replication.factor", "1");
         brokerProps.put("transaction.state.log.min.isr", "1");
 
-        // build a Connect cluster backed by Kafka and Zk
+        // build a Connect cluster backed by Kafka
         connectBuilder = new EmbeddedConnectCluster.Builder()
                 .numWorkers(DEFAULT_NUM_WORKERS)
                 .numBrokers(1)
@@ -159,7 +166,7 @@ public class ExactlyOnceSourceIntegrationTest {
     @AfterEach
     public void close() {
         try {
-            // stop all Connect, Kafka and Zk threads.
+            // stop the Connect cluster and its backing Kafka cluster.
             connect.stop();
         } finally {
             // Clear the handle for the connector. Fun fact: if you don't do 
this, your tests become quite flaky.
@@ -624,17 +631,18 @@ public class ExactlyOnceSourceIntegrationTest {
      */
     @Test
     public void testTasksFailOnInabilityToFence() throws Exception {
-        brokerProps.put("authorizer.class.name", 
"kafka.security.authorizer.AclAuthorizer");
-        brokerProps.put("sasl.enabled.mechanisms", "PLAIN");
-        brokerProps.put("sasl.mechanism.inter.broker.protocol", "PLAIN");
-        brokerProps.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
-        brokerProps.put("listeners", "SASL_PLAINTEXT://localhost:0");
-        brokerProps.put("listener.name.sasl_plaintext.plain.sasl.jaas.config",
-                "org.apache.kafka.common.security.plain.PlainLoginModule 
required "
-                        + "username=\"super\" "
-                        + "password=\"super_pwd\" "
-                        + "user_connector=\"connector_pwd\" "
-                        + "user_super=\"super_pwd\";");
+        
brokerProps.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, 
"CONTROLLER:SASL_PLAINTEXT,EXTERNAL:SASL_PLAINTEXT");
+        brokerProps.put(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, 
"org.apache.kafka.metadata.authorizer.StandardAuthorizer");
+        brokerProps.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, 
"PLAIN");
+        
brokerProps.put(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG,
 "PLAIN");
+        
brokerProps.put(KRaftConfigs.SASL_MECHANISM_CONTROLLER_PROTOCOL_CONFIG, 
"PLAIN");
+        String listenerSaslJaasConfig = 
"org.apache.kafka.common.security.plain.PlainLoginModule required "
+                + "username=\"super\" "
+                + "password=\"super_pwd\" "
+                + "user_connector=\"connector_pwd\" "
+                + "user_super=\"super_pwd\";";
+        brokerProps.put("listener.name.external.plain.sasl.jaas.config", 
listenerSaslJaasConfig);
+        brokerProps.put("listener.name.controller.plain.sasl.jaas.config", 
listenerSaslJaasConfig);
         brokerProps.put("super.users", "User:super");
 
         Map<String, String> superUserClientConfig = new HashMap<>();
@@ -694,12 +702,30 @@ public class ExactlyOnceSourceIntegrationTest {
             )).all().get();
         }
 
-        StartAndStopLatch connectorStart = connectorAndTaskStart(tasksMax);
-
         log.info("Bringing up connector with fresh slate; fencing should not 
be necessary");
         connect.configureConnector(CONNECTOR_NAME, props);
-        assertConnectorStarted(connectorStart);
-        // Verify that the connector and its tasks have been able to start 
successfully
+
+        // Hack: There is a small chance that our recent ACL updates for the 
connector have
+        // not yet been propagated across the entire Kafka cluster, and that 
our connector
+        // will fail on startup when it tries to list the end offsets of the 
worker's offsets topic
+        // So, we implement some retry logic here to add a layer of resiliency 
in that case
+        waitForCondition(
+                () -> {
+                    ConnectorStateInfo status = 
connect.connectorStatus(CONNECTOR_NAME);
+                    if ("RUNNING".equals(status.connector().state())) {
+                        return true;
+                    } else if ("FAILED".equals(status.connector().state())) {
+                        log.debug("Restarting failed connector {}", 
CONNECTOR_NAME);
+                        connect.restartConnector(CONNECTOR_NAME);
+                    }
+                    return false;
+                },
+                ACL_PROPAGATION_TIMEOUT_MS,
+                "Connector was not able to start in time, "
+                        + "or ACL updates were not propagated across the Kafka 
cluster soon enough"
+        );
+
+        // Also verify that the connector's tasks have been able to start 
successfully
         
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
 tasksMax, "Connector and task should have started successfully");
 
         log.info("Reconfiguring connector; fencing should be necessary, and 
tasks should fail to start");
@@ -725,8 +751,39 @@ public class ExactlyOnceSourceIntegrationTest {
 
         log.info("Restarting connector after tweaking its ACLs; fencing should 
succeed this time");
         connect.restartConnectorAndTasks(CONNECTOR_NAME, false, true, false);
+
         // Verify that the connector and its tasks have been able to restart 
successfully
-        
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
 tasksMax, "Connector and task should have restarted successfully");
+        // Use the same retry logic as above, in case there is a delay in the 
propagation of our ACL updates
+        waitForCondition(
+                () -> {
+                    ConnectorStateInfo status = 
connect.connectorStatus(CONNECTOR_NAME);
+                    boolean connectorRunning = 
"RUNNING".equals(status.connector().state());
+                    boolean allTasksRunning = status.tasks().stream()
+                            .allMatch(t -> "RUNNING".equals(t.state()));
+                    boolean expectedNumTasks = status.tasks().size() == 
tasksMax;
+                    if (connectorRunning && allTasksRunning && 
expectedNumTasks) {
+                        return true;
+                    } else {
+                        if (!connectorRunning) {
+                            if ("FAILED".equals(status.connector().state())) {
+                                // Only task failures are expected ;if the 
connector has failed, something
+                                // else is wrong and we should fail the test 
immediately
+                                throw new NoRetryException(
+                                        new AssertionError("Connector " + 
CONNECTOR_NAME + " has failed unexpectedly")
+                                );
+                            }
+                        }
+                        // Restart all failed tasks
+                        status.tasks().stream()
+                                .filter(t -> "FAILED".equals(t.state()))
+                                .map(ConnectorStateInfo.TaskState::id)
+                                .forEach(t -> 
connect.restartTask(CONNECTOR_NAME, t));
+                        return false;
+                    }
+                },
+                ConnectAssertions.CONNECTOR_SETUP_DURATION_MS,
+                "Connector and task should have restarted successfully"
+        );
     }
 
     /**
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
index 0dbea6da2be..d131fd4efc6 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
@@ -79,7 +79,7 @@ public class ExampleConnectIntegrationTest {
         Properties exampleBrokerProps = new Properties();
         exampleBrokerProps.put("auto.create.topics.enable", "false");
 
-        // build a Connect cluster backed by Kafka and Zk
+        // build a Connect cluster backed by a Kafka KRaft cluster
         connect = new EmbeddedConnectCluster.Builder()
                 .name("connect-cluster")
                 .numWorkers(NUM_WORKERS)
@@ -100,7 +100,7 @@ public class ExampleConnectIntegrationTest {
         // delete connector handle
         RuntimeHandles.get().deleteConnector(CONNECTOR_NAME);
 
-        // stop all Connect, Kafka and Zk threads.
+        // stop the Connect cluster and its backing Kafka cluster.
         connect.stop();
     }
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java
index ed7a786e1d6..a625dc983e8 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java
@@ -64,7 +64,7 @@ public class InternalTopicsIntegrationTest {
 
     @AfterEach
     public void close() {
-        // stop all Connect, Kafka and Zk threads.
+        // stop the Connect workers and Kafka brokers.
         connect.stop();
     }
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java
index 4571e584502..0ac514039f0 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java
@@ -120,7 +120,6 @@ public class OffsetsApiIntegrationTest {
 
     @AfterAll
     public static void close() {
-        // stop all Connect, Kafka and Zk threads.
         CONNECT_CLUSTERS.values().forEach(EmbeddedConnectCluster::stop);
         // wait for all blocked threads created while testing zombie task 
scenarios to finish
         BlockingConnectorTest.Block.join();
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
index 178697a14d0..ff028928c25 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
@@ -87,7 +87,7 @@ public class RebalanceSourceConnectorsIntegrationTest {
         Properties brokerProps = new Properties();
         brokerProps.put("auto.create.topics.enable", "false");
 
-        // build a Connect cluster backed by Kafka and Zk
+        // build a Connect cluster backed by a Kafka KRaft cluster
         connect = new EmbeddedConnectCluster.Builder()
                 .name("connect-cluster")
                 .numWorkers(NUM_WORKERS)
@@ -103,7 +103,7 @@ public class RebalanceSourceConnectorsIntegrationTest {
     @AfterEach
     public void close(TestInfo testInfo) {
         log.info("Finished test {}", testInfo.getDisplayName());
-        // stop all Connect, Kafka and Zk threads.
+        // stop the Connect cluster and its backing Kafka cluster.
         connect.stop();
     }
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
index b0e1cae1fae..8ccc31baa86 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
@@ -67,7 +67,7 @@ public class RestExtensionIntegrationTest {
         Map<String, String> workerProps = new HashMap<>();
         workerProps.put(REST_EXTENSION_CLASSES_CONFIG, 
IntegrationTestRestExtension.class.getName());
 
-        // build a Connect cluster backed by Kafka and Zk
+        // build a Connect cluster backed by a Kafka KRaft cluster
         connect = new EmbeddedConnectCluster.Builder()
             .name("connect-cluster")
             .numWorkers(NUM_WORKERS)
@@ -135,7 +135,7 @@ public class RestExtensionIntegrationTest {
 
     @AfterEach
     public void close() {
-        // stop all Connect, Kafka and Zk threads.
+        // stop the Connect cluster and its backing Kafka cluster.
         connect.stop();
         IntegrationTestRestExtension.instance = null;
     }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java
index a482c352709..7969471918e 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java
@@ -64,7 +64,7 @@ public class SessionedProtocolIntegrationTest {
         Map<String, String> workerProps = new HashMap<>();
         workerProps.put(CONNECT_PROTOCOL_CONFIG, 
ConnectProtocolCompatibility.SESSIONED.protocol());
 
-        // build a Connect cluster backed by Kafka and Zk
+        // build a Connect cluster backed by a Kafka KRaft cluster
         connect = new EmbeddedConnectCluster.Builder()
             .name("connect-cluster")
             .numWorkers(2)
@@ -81,7 +81,7 @@ public class SessionedProtocolIntegrationTest {
 
     @AfterEach
     public void close() {
-        // stop all Connect, Kafka and Zk threads.
+        // stop the Connect cluster and its backing Kafka cluster.
         connect.stop();
     }
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SinkConnectorsIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SinkConnectorsIntegrationTest.java
index c67d0ca30f1..961eeb70f99 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SinkConnectorsIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SinkConnectorsIntegrationTest.java
@@ -75,7 +75,7 @@ public class SinkConnectorsIntegrationTest {
         brokerProps.put("auto.create.topics.enable", "false");
         brokerProps.put("delete.topic.enable", "true");
 
-        // build a Connect cluster backed by Kafka and Zk
+        // build a Connect cluster backed by a Kafka KRaft cluster
         connect = new EmbeddedConnectCluster.Builder()
                 .name("connect-cluster")
                 .numWorkers(NUM_WORKERS)
@@ -90,7 +90,7 @@ public class SinkConnectorsIntegrationTest {
         // delete connector handle
         RuntimeHandles.get().deleteConnector(CONNECTOR_NAME);
 
-        // stop all Connect, Kafka and Zk threads.
+        // stop the Connect cluster and its backing Kafka cluster.
         connect.stop();
     }
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java
index 91416cb8441..aa1dc6bcf94 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java
@@ -80,7 +80,7 @@ public class SourceConnectorsIntegrationTest {
         // setup Kafka broker properties
         brokerProps.put("auto.create.topics.enable", String.valueOf(false));
 
-        // build a Connect cluster backed by Kafka and Zk
+        // build a Connect cluster backed by a Kafka KRaft cluster
         connectBuilder = new EmbeddedConnectCluster.Builder()
                 .name("connect-cluster")
                 .numWorkers(NUM_WORKERS)
@@ -91,7 +91,7 @@ public class SourceConnectorsIntegrationTest {
 
     @AfterEach
     public void close() {
-        // stop all Connect, Kafka and Zk threads.
+        // stop the Connect cluster and its backing Kafka cluster.
         connect.stop();
     }
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java
index 5c8c2f5630f..105d238d56f 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java
@@ -84,7 +84,7 @@ public class TransformationIntegrationTest {
         // This is required because tests in this class also test 
per-connector topic creation with transformations
         brokerProps.put("auto.create.topics.enable", "false");
 
-        // build a Connect cluster backed by Kafka and Zk
+        // build a Connect cluster backed by a Kafka KRaft cluster
         connect = new EmbeddedConnectCluster.Builder()
                 .name("connect-cluster")
                 .numWorkers(NUM_WORKERS)
@@ -105,7 +105,7 @@ public class TransformationIntegrationTest {
         // delete connector handle
         RuntimeHandles.get().deleteConnector(CONNECTOR_NAME);
 
-        // stop all Connect, Kafka and Zk threads.
+        // stop the Connect cluster and its backing Kafka cluster.
         connect.stop();
     }
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
index 38f61b06b2e..018c9a40b05 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
@@ -121,7 +121,7 @@ abstract class EmbeddedConnect {
     };
 
     /**
-     * Start the Connect cluster and the embedded Kafka and Zookeeper cluster,
+     * Start the Connect cluster and the embedded Kafka KRaft cluster,
      * and wait for the Kafka and Connect clusters to become healthy.
      */
     public void start() {
@@ -163,7 +163,7 @@ abstract class EmbeddedConnect {
     }
 
     /**
-     * Stop the connect cluster and the embedded Kafka and Zookeeper cluster.
+     * Stop the Connect cluster and the embedded Kafka KRaft cluster.
      * Clean up any temp directories created locally.
      *
      * @throws RuntimeException if Kafka brokers fail to stop
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
index 8272c294bd3..eab33ef2678 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
@@ -41,9 +41,11 @@ import static 
org.apache.kafka.connect.runtime.distributed.DistributedConfig.STA
 import static 
org.apache.kafka.connect.runtime.rest.RestServerConfig.LISTENERS_CONFIG;
 
 /**
- * Start an embedded connect cluster. Internally, this class will spin up a 
Kafka and Zk cluster, set up any tmp
- * directories, and clean them up on exit. Methods on the same {@code 
EmbeddedConnectCluster} are
- * not guaranteed to be thread-safe.
+ * Start an embedded Connect cluster that can be used for integration tests. 
Internally, this class also spins up a
+ * backing Kafka KRaft cluster for the Connect cluster leveraging {@link 
kafka.testkit.KafkaClusterTestKit}. Methods
+ * on the same {@code EmbeddedConnectCluster} are not guaranteed to be 
thread-safe. This class also provides various
+ * utility methods to perform actions on the Connect cluster such as connector 
creation, config validation, connector
+ * restarts, pause / resume, connector deletion etc.
  */
 public class EmbeddedConnectCluster extends EmbeddedConnect {
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectStandalone.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectStandalone.java
index 3cbbca5ebdf..66ce78d0d1b 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectStandalone.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectStandalone.java
@@ -47,7 +47,7 @@ import static 
org.apache.kafka.connect.runtime.rest.RestServerConfig.LISTENERS_C
 import static 
org.apache.kafka.connect.runtime.standalone.StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG;
 
 /**
- * Start a standalone embedded connect worker. Internally, this class will 
spin up a Kafka and Zk cluster,
+ * Start a standalone embedded connect worker. Internally, this class will 
spin up a Kafka cluster,
  * set up any tmp directories. and clean them up on exit. Methods on the same
  * {@code EmbeddedConnectStandalone} are not guaranteed to be thread-safe.
  */
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
index e02c69fc43c..37d130bbf8b 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
@@ -16,12 +16,9 @@
  */
 package org.apache.kafka.connect.util.clusters;
 
-import kafka.cluster.EndPoint;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-import kafka.utils.CoreUtils;
-import kafka.utils.TestUtils;
-import kafka.zk.EmbeddedZookeeper;
+import kafka.server.BrokerServer;
+import kafka.testkit.KafkaClusterTestKit;
+import kafka.testkit.TestKitNodes;
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.Admin;
@@ -49,23 +46,17 @@ import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.common.errors.InvalidReplicationFactorException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
-import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
 import org.apache.kafka.metadata.BrokerState;
 import org.apache.kafka.network.SocketServerConfigs;
-import org.apache.kafka.server.config.ServerConfigs;
-import org.apache.kafka.server.config.ZkConfigs;
-import org.apache.kafka.storage.internals.log.CleanerConfig;
+import org.apache.kafka.server.config.ServerLogConfigs;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.nio.file.Files;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -83,6 +74,7 @@ import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
@@ -95,98 +87,62 @@ import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
-import static 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG;
-import static 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG;
-import static 
org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
-import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG;
 
 /**
- * Setup an embedded Kafka cluster with specified number of brokers and 
specified broker properties. To be used for
- * integration tests.
+ * Setup an embedded Kafka KRaft cluster (using {@link 
kafka.testkit.KafkaClusterTestKit} internally) with the
+ * specified number of brokers and the specified broker properties. This can 
be used for integration tests and is
+ * typically used in conjunction with {@link EmbeddedConnectCluster}. 
Additional Kafka client properties can also be
+ * supplied if required. This class also provides various utility methods to 
easily create Kafka topics, produce data,
+ * consume data etc.
  */
 public class EmbeddedKafkaCluster {
 
     private static final Logger log = 
LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
 
-    private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120); 
+    private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120);
 
-    // Kafka Config
-    private final KafkaServer[] brokers;
+    private final KafkaClusterTestKit cluster;
     private final Properties brokerConfig;
-    private final Time time = Time.SYSTEM;
-    private final int[] currentBrokerPorts;
-    private final String[] currentBrokerLogDirs;
-    private final boolean hasListenerConfig;
+    private final Map<String, String> clientConfigs;
 
-    final Map<String, String> clientConfigs;
-
-    private EmbeddedZookeeper zookeeper = null;
-    private ListenerName listenerName = new ListenerName("PLAINTEXT");
     private KafkaProducer<byte[], byte[]> producer;
 
-    public EmbeddedKafkaCluster(final int numBrokers,
-                                final Properties brokerConfig) {
+    public EmbeddedKafkaCluster(final int numBrokers, final Properties 
brokerConfig) {
         this(numBrokers, brokerConfig, Collections.emptyMap());
     }
 
     public EmbeddedKafkaCluster(final int numBrokers,
-                                final Properties brokerConfig,
-                                final Map<String, String> clientConfigs) {
-        brokers = new KafkaServer[numBrokers];
-        currentBrokerPorts = new int[numBrokers];
-        currentBrokerLogDirs = new String[numBrokers];
-        this.brokerConfig = brokerConfig;
-        // Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we 
track whether
-        // a listener config is defined during initialization in order to know 
if it's
-        // safe to override it
-        hasListenerConfig = 
brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG) != null;
+                                   final Properties brokerConfig,
+                                   final Map<String, String> clientConfigs) {
+        addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers);
+        try {
+            KafkaClusterTestKit.Builder clusterBuilder = new 
KafkaClusterTestKit.Builder(
+                    new TestKitNodes.Builder()
+                            .setCombined(true)
+                            .setNumBrokerNodes(numBrokers)
+                            // Reduce number of controllers for faster startup
+                            // We may make this configurable in the future if 
there's a use case for it
+                            .setNumControllerNodes(1)
+                            .build()
+            );
 
+            brokerConfig.forEach((k, v) -> 
clusterBuilder.setConfigProp((String) k, v));
+            cluster = clusterBuilder.build();
+            cluster.nonFatalFaultHandler().setIgnore(true);
+        } catch (Exception e) {
+            throw new ConnectException("Failed to create test Kafka cluster", 
e);
+        }
+        this.brokerConfig = brokerConfig;
         this.clientConfigs = clientConfigs;
     }
 
-    /**
-     * Starts the Kafka cluster alone using the ports that were assigned 
during initialization of
-     * the harness.
-     *
-     * @throws ConnectException if a directory to store the data cannot be 
created
-     */
-    public void startOnlyKafkaOnSamePorts() {
-        doStart();
-    }
-
     public void start() {
-        // pick a random port
-        zookeeper = new EmbeddedZookeeper();
-        Arrays.fill(currentBrokerPorts, 0);
-        Arrays.fill(currentBrokerLogDirs, null);
-        doStart();
-    }
-
-    private void doStart() {
-        brokerConfig.put(ZkConfigs.ZK_CONNECT_CONFIG, zKConnectString());
-
-        putIfAbsent(brokerConfig, ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, 
true);
-        putIfAbsent(brokerConfig, 
GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 0);
-        putIfAbsent(brokerConfig, 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, (short) 
brokers.length);
-        putIfAbsent(brokerConfig, AUTO_CREATE_TOPICS_ENABLE_CONFIG, false);
-        // reduce the size of the log cleaner map to reduce test memory usage
-        putIfAbsent(brokerConfig, 
CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, 2 * 1024 * 1024L);
-
-        Object listenerConfig = 
brokerConfig.get(INTER_BROKER_LISTENER_NAME_CONFIG);
-        if (listenerConfig == null)
-            listenerConfig = 
brokerConfig.get(INTER_BROKER_SECURITY_PROTOCOL_CONFIG);
-        if (listenerConfig == null)
-            listenerConfig = "PLAINTEXT";
-        listenerName = new ListenerName(listenerConfig.toString());
-
-        for (int i = 0; i < brokers.length; i++) {
-            brokerConfig.put(ServerConfigs.BROKER_ID_CONFIG, i);
-            currentBrokerLogDirs[i] = currentBrokerLogDirs[i] == null ? 
createLogDir() : currentBrokerLogDirs[i];
-            brokerConfig.put(LOG_DIR_CONFIG, currentBrokerLogDirs[i]);
-            if (!hasListenerConfig)
-                brokerConfig.put(SocketServerConfigs.LISTENERS_CONFIG, 
listenerName.value() + "://localhost:" + currentBrokerPorts[i]);
-            brokers[i] = TestUtils.createServer(new KafkaConfig(brokerConfig, 
true), time);
-            currentBrokerPorts[i] = brokers[i].boundPort(listenerName);
+        try {
+            cluster.format();
+            cluster.startup();
+            cluster.waitForReadyBrokers();
+        } catch (Exception e) {
+            throw new ConnectException("Failed to start test Kafka cluster", 
e);
         }
 
         Map<String, Object> producerProps = new HashMap<>(clientConfigs);
@@ -199,149 +155,65 @@ public class EmbeddedKafkaCluster {
         producer = new KafkaProducer<>(producerProps, new 
ByteArraySerializer(), new ByteArraySerializer());
     }
 
-    public void stopOnlyKafka() {
-        stop(false, false);
-    }
-
-    public void stop() {
-        stop(true, true);
-    }
-
-    private void stop(boolean deleteLogDirs, boolean stopZK) {
-        maybeShutDownProducer();
-        triggerBrokerShutdown();
-        awaitBrokerShutdown();
-
-        if (deleteLogDirs)
-            deleteLogDirs();
-
-        if (stopZK)
-            stopZK();
-    }
-
-    private void maybeShutDownProducer() {
-        try {
-            if (producer != null) {
-                producer.close();
-            }
-        } catch (Exception e) {
-            log.error("Could not shutdown producer ", e);
-            throw new RuntimeException("Could not shutdown producer", e);
-        }
-    }
-
-    private void triggerBrokerShutdown() {
-        for (KafkaServer broker : brokers) {
-            try {
-                broker.shutdown();
-            } catch (Throwable t) {
-                String msg = String.format("Could not shutdown broker at %s", 
address(broker));
-                log.error(msg, t);
-                throw new RuntimeException(msg, t);
-            }
-        }
-    }
-
-    private void awaitBrokerShutdown() {
-        for (KafkaServer broker : brokers) {
-            try {
-                broker.awaitShutdown();
-            } catch (Throwable t) {
-                String msg = String.format("Failed while awaiting shutdown of 
broker at %s", address(broker));
-                log.error(msg, t);
-                throw new RuntimeException(msg, t);
-            }
-        }
-    }
-
-    private void deleteLogDirs() {
-        for (KafkaServer broker : brokers) {
-            try {
-                log.info("Cleaning up kafka log dirs at {}", 
broker.config().logDirs());
-                CoreUtils.delete(broker.config().logDirs());
-            } catch (Throwable t) {
-                String msg = String.format("Could not clean up log dirs for 
broker at %s",
-                        address(broker));
-                log.error(msg, t);
-                throw new RuntimeException(msg, t);
-            }
-        }
-    }
-
-    private void stopZK() {
-        try {
-            zookeeper.shutdown();
-        } catch (Throwable t) {
-            String msg = String.format("Could not shutdown zookeeper at %s", 
zKConnectString());
-            log.error(msg, t);
-            throw new RuntimeException(msg, t);
-        }
+    /**
+     * Restarts the Kafka brokers. This can be called after {@link 
#stopOnlyBrokers()}. Note that if the Kafka brokers
+     * need to be listening on the same ports as earlier, the {@link 
#brokerConfig} should contain the
+     * {@link SocketServerConfigs#LISTENERS_CONFIG} property and it should use 
a fixed non-zero free port. Also note that this is
+     * only possible when {@code numBrokers} is 1.
+     */
+    public void restartOnlyBrokers() {
+        cluster.brokers().values().forEach(BrokerServer::startup);
     }
 
-    private static void putIfAbsent(final Properties props, final String 
propertyKey, final Object propertyValue) {
-        if (!props.containsKey(propertyKey)) {
-            props.put(propertyKey, propertyValue);
-        }
+    /**
+     * Stop only the Kafka brokers (and not the KRaft controllers). This can 
be used to test Connect's functionality
+     * when the backing Kafka cluster goes offline.
+     */
+    public void stopOnlyBrokers() {
+        cluster.brokers().values().forEach(BrokerServer::shutdown);
+        cluster.brokers().values().forEach(BrokerServer::awaitShutdown);
     }
 
-    private String createLogDir() {
-        try {
-            return 
Files.createTempDirectory(getClass().getSimpleName()).toString();
-        } catch (IOException e) {
-            log.error("Unable to create temporary log directory", e);
-            throw new ConnectException("Unable to create temporary log 
directory", e);
+    public void stop() {
+        AtomicReference<Throwable> shutdownFailure = new AtomicReference<>();
+        Utils.closeQuietly(producer, "producer for embedded Kafka cluster", 
shutdownFailure);
+        Utils.closeQuietly(cluster, "embedded Kafka cluster", shutdownFailure);
+        if (shutdownFailure.get() != null) {
+            throw new ConnectException("Failed to shut down producer / 
embedded Kafka cluster", shutdownFailure.get());
         }
     }
 
     public String bootstrapServers() {
-        return Arrays.stream(brokers)
-                .map(this::address)
-                .collect(Collectors.joining(","));
-    }
-
-    public String address(KafkaServer server) {
-        final EndPoint endPoint = server.advertisedListeners().head();
-        return endPoint.host() + ":" + endPoint.port();
-    }
-
-    public String zKConnectString() {
-        return "127.0.0.1:" + zookeeper.port();
+        return cluster.bootstrapServers();
     }
 
     /**
      * Get the brokers that have a {@link BrokerState#RUNNING} state.
      *
-     * @return the list of {@link KafkaServer} instances that are running;
-     *         never null but  possibly empty
+     * @return the set of {@link BrokerServer} instances that are running;
+     *         never null but possibly empty
      */
-    public Set<KafkaServer> runningBrokers() {
-        return brokersInState(state -> state == BrokerState.RUNNING);
+    public Set<BrokerServer> runningBrokers() {
+        return brokersInState(BrokerState.RUNNING::equals);
     }
 
     /**
      * Get the brokers whose state match the given predicate.
      *
-     * @return the list of {@link KafkaServer} instances with states that 
match the predicate;
-     *         never null but  possibly empty
+     * @return the set of {@link BrokerServer} instances with states that 
match the predicate;
+     *         never null but possibly empty
      */
-    public Set<KafkaServer> brokersInState(Predicate<BrokerState> 
desiredState) {
-        return Arrays.stream(brokers)
-                     .filter(b -> hasState(b, desiredState))
-                     .collect(Collectors.toSet());
+    public Set<BrokerServer> brokersInState(Predicate<BrokerState> 
desiredState) {
+        return cluster.brokers().values().stream()
+                .filter(b -> desiredState.test(b.brokerState()))
+                .collect(Collectors.toSet());
     }
 
-    protected boolean hasState(KafkaServer server, Predicate<BrokerState> 
desiredState) {
-        try {
-            return desiredState.test(server.brokerState());
-        } catch (Throwable e) {
-            // Broker failed to respond.
-            return false;
-        }
-    }
-    
     public boolean sslEnabled() {
-        final String listeners = 
brokerConfig.getProperty(SocketServerConfigs.LISTENERS_CONFIG);
-        return listeners != null && listeners.contains("SSL");
+        final String listenerSecurityProtocolMap = 
brokerConfig.getProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG);
+        if (listenerSecurityProtocolMap == null)
+            return false;
+        return listenerSecurityProtocolMap.contains(":SSL") || 
listenerSecurityProtocolMap.contains(":SASL_SSL");
     }
 
     /**
@@ -447,9 +319,9 @@ public class EmbeddedKafkaCluster {
      * @param adminClientConfig Additional admin client configuration settings.
      */
     public void createTopic(String topic, int partitions, int replication, 
Map<String, String> topicConfig, Map<String, Object> adminClientConfig) {
-        if (replication > brokers.length) {
+        if (replication > cluster.brokers().size()) {
             throw new InvalidReplicationFactorException("Insufficient brokers 
("
-                    + brokers.length + ") for desired replication (" + 
replication + ")");
+                    + cluster.brokers().size() + ") for desired replication (" 
+ replication + ")");
         }
 
         log.info("Creating topic { name: {}, partitions: {}, replication: {}, 
config: {} }",
@@ -498,8 +370,7 @@ public class EmbeddedKafkaCluster {
         Properties props = Utils.mkProperties(clientConfigs);
         props.putAll(adminClientConfig);
         props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers());
-        final Object listeners = 
brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG);
-        if (listeners != null && listeners.toString().contains("SSL")) {
+        if (sslEnabled()) {
             props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, 
brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
             props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ((Password) 
brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).value());
             props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
@@ -702,16 +573,16 @@ public class EmbeddedKafkaCluster {
         Map<String, Object> props = new HashMap<>(clientConfigs);
         props.putAll(consumerProps);
 
-        putIfAbsent(props, GROUP_ID_CONFIG, UUID.randomUUID().toString());
-        putIfAbsent(props, BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
-        putIfAbsent(props, ENABLE_AUTO_COMMIT_CONFIG, "false");
-        putIfAbsent(props, AUTO_OFFSET_RESET_CONFIG, "earliest");
-        putIfAbsent(props, KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
-        putIfAbsent(props, VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        props.putIfAbsent(GROUP_ID_CONFIG, UUID.randomUUID().toString());
+        props.putIfAbsent(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
+        props.putIfAbsent(ENABLE_AUTO_COMMIT_CONFIG, "false");
+        props.putIfAbsent(AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.putIfAbsent(KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        props.putIfAbsent(VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
         if (sslEnabled()) {
-            putIfAbsent(props, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, 
brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
-            putIfAbsent(props, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, 
brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
-            putIfAbsent(props, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
"SSL");
+            props.putIfAbsent(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, 
brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
+            props.putIfAbsent(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, 
brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
+            props.putIfAbsent(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
"SSL");
         }
         KafkaConsumer<byte[], byte[]> consumer;
         try {
@@ -731,13 +602,13 @@ public class EmbeddedKafkaCluster {
     public KafkaProducer<byte[], byte[]> createProducer(Map<String, Object> 
producerProps) {
         Map<String, Object> props = new HashMap<>(clientConfigs);
         props.putAll(producerProps);
-        putIfAbsent(props, BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
-        putIfAbsent(props, KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
-        putIfAbsent(props, VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
+        props.putIfAbsent(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
+        props.putIfAbsent(KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
+        props.putIfAbsent(VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
         if (sslEnabled()) {
-            putIfAbsent(props, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, 
brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
-            putIfAbsent(props, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, 
brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
-            putIfAbsent(props, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
"SSL");
+            props.putIfAbsent(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, 
brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
+            props.putIfAbsent(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, 
brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
+            props.putIfAbsent(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
"SSL");
         }
         KafkaProducer<byte[], byte[]> producer;
         try {
@@ -748,9 +619,9 @@ public class EmbeddedKafkaCluster {
         return producer;
     }
 
-    private static void putIfAbsent(final Map<String, Object> props, final 
String propertyKey, final Object propertyValue) {
-        if (!props.containsKey(propertyKey)) {
-            props.put(propertyKey, propertyValue);
-        }
+    private void addDefaultBrokerPropsIfAbsent(Properties brokerConfig, int 
numBrokers) {
+        
brokerConfig.putIfAbsent(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
 "0");
+        
brokerConfig.putIfAbsent(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
 String.valueOf(numBrokers));
+        
brokerConfig.putIfAbsent(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, 
"false");
     }
 }
diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java 
b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
index 9ac3498d196..e44f5ad5219 100644
--- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
+++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
@@ -149,14 +149,14 @@ public class KafkaClusterTestKit implements AutoCloseable 
{
 
     public static class Builder {
         private TestKitNodes nodes;
-        private Map<String, String> configProps = new HashMap<>();
-        private SimpleFaultHandlerFactory faultHandlerFactory = new 
SimpleFaultHandlerFactory();
+        private final Map<String, Object> configProps = new HashMap<>();
+        private final SimpleFaultHandlerFactory faultHandlerFactory = new 
SimpleFaultHandlerFactory();
 
         public Builder(TestKitNodes nodes) {
             this.nodes = nodes;
         }
 
-        public Builder setConfigProp(String key, String value) {
+        public Builder setConfigProp(String key, Object value) {
             this.configProps.put(key, value);
             return this;
         }
@@ -165,7 +165,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
             BrokerNode brokerNode = nodes.brokerNodes().get(node.id());
             ControllerNode controllerNode = 
nodes.controllerNodes().get(node.id());
 
-            Map<String, String> props = new HashMap<>(configProps);
+            Map<String, Object> props = new HashMap<>(configProps);
             props.put(KRaftConfigs.SERVER_MAX_STARTUP_TIME_MS_CONFIG,
                     Long.toString(TimeUnit.MINUTES.toMillis(10)));
             props.put(KRaftConfigs.PROCESS_ROLES_CONFIG, roles(node.id()));
@@ -188,13 +188,16 @@ public class KafkaClusterTestKit implements AutoCloseable 
{
                 props.put(LOG_DIRS_CONFIG,
                     controllerNode.metadataDirectory());
             }
-            
props.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
-                    "EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT");
-            props.put(SocketServerConfigs.LISTENERS_CONFIG, 
listeners(node.id()));
-            props.put(INTER_BROKER_LISTENER_NAME_CONFIG,
-                    nodes.interBrokerListenerName().value());
-            props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG,
-                    "CONTROLLER");
+
+            // We allow configuring the listeners and related properties via 
Builder::setConfigProp,
+            // and they shouldn't be overridden here
+            
props.putIfAbsent(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
+                "EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT");
+            props.putIfAbsent(SocketServerConfigs.LISTENERS_CONFIG, 
listeners(node.id()));
+            props.putIfAbsent(INTER_BROKER_LISTENER_NAME_CONFIG,
+                nodes.interBrokerListenerName().value());
+            props.putIfAbsent(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, 
"CONTROLLER");
+
             // Note: we can't accurately set controller.quorum.voters yet, 
since we don't
             // yet know what ports each controller will pick.  Set it to a 
dummy string
             // for now as a placeholder.
@@ -257,7 +260,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
                                 nodes.bootstrapMetadata());
                     } catch (Throwable e) {
                         log.error("Error creating controller {}", node.id(), 
e);
-                        Utils.swallow(log, Level.WARN, 
"sharedServer.stopForController error", () -> sharedServer.stopForController());
+                        Utils.swallow(log, Level.WARN, 
"sharedServer.stopForController error", sharedServer::stopForController);
                         throw e;
                     }
                     controllers.put(node.id(), controller);
@@ -288,7 +291,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
                         broker = new BrokerServer(sharedServer);
                     } catch (Throwable e) {
                         log.error("Error creating broker {}", node.id(), e);
-                        Utils.swallow(log, Level.WARN, 
"sharedServer.stopForBroker error", () -> sharedServer.stopForBroker());
+                        Utils.swallow(log, Level.WARN, 
"sharedServer.stopForBroker error", sharedServer::stopForBroker);
                         throw e;
                     }
                     brokers.put(node.id(), broker);


Reply via email to