yashmayya commented on code in PR #13375:
URL: https://github.com/apache/kafka/pull/13375#discussion_r1147210852


##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##########
@@ -656,6 +567,19 @@ public KafkaProducer<byte[], byte[]> 
createProducer(Map<String, Object> producer
         return producer;
     }
 
+    private void addDefaultBrokerPropsIfAbsent(Properties brokerConfig, int 
numBrokers) {
+        putIfAbsent(brokerConfig, KafkaConfig.DeleteTopicEnableProp(), "true");
+        putIfAbsent(brokerConfig, 
KafkaConfig.GroupInitialRebalanceDelayMsProp(), "0");
+        putIfAbsent(brokerConfig, 
KafkaConfig.OffsetsTopicReplicationFactorProp(), String.valueOf(numBrokers));
+        putIfAbsent(brokerConfig, KafkaConfig.AutoCreateTopicsEnableProp(), 
"false");
+    }
+
+    private static void putIfAbsent(final Properties props, final String 
propertyKey, final Object propertyValue) {
+        if (!props.containsKey(propertyKey)) {
+            props.put(propertyKey, propertyValue);
+        }
+    }

Review Comment:
   Ah whoops, my bad! I can go ahead and clean up the map variant in this PR as 
well if you'd like?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##########
@@ -89,96 +82,62 @@
 import static org.junit.Assert.assertFalse;
 
 /**
- * Setup an embedded Kafka cluster with specified number of brokers and 
specified broker properties. To be used for
- * integration tests.
+ * Setup an embedded Kafka KRaft cluster (using {@link 
kafka.testkit.KafkaClusterTestKit} internally) with the
+ * specified number of brokers and the specified broker properties. This can 
be used for integration tests and is
+ * typically used in conjunction with {@link EmbeddedConnectCluster}. 
Additional Kafka client properties can also be
+ * supplied if required. This class also provides various utility methods to 
easily create Kafka topics, produce data,
+ * consume data etc.
  */
 public class EmbeddedKafkaCluster {
 
     private static final Logger log = 
LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
 
-    private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120); 
+    private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120);
 
-    // Kafka Config
-    private final KafkaServer[] brokers;
+    private final KafkaClusterTestKit cluster;
     private final Properties brokerConfig;
-    private final Time time = new MockTime();
-    private final int[] currentBrokerPorts;
-    private final String[] currentBrokerLogDirs;
-    private final boolean hasListenerConfig;
-
-    final Map<String, String> clientConfigs;
-
-    private EmbeddedZookeeper zookeeper = null;
-    private ListenerName listenerName = new ListenerName("PLAINTEXT");
+    private final Map<String, String> clientConfigs;
     private KafkaProducer<byte[], byte[]> producer;
 
-    public EmbeddedKafkaCluster(final int numBrokers,
-                                final Properties brokerConfig) {
+    public EmbeddedKafkaCluster(final int numBrokers, final Properties 
brokerConfig) {
         this(numBrokers, brokerConfig, Collections.emptyMap());
     }
 
     public EmbeddedKafkaCluster(final int numBrokers,
-                                final Properties brokerConfig,
-                                final Map<String, String> clientConfigs) {
-        brokers = new KafkaServer[numBrokers];
-        currentBrokerPorts = new int[numBrokers];
-        currentBrokerLogDirs = new String[numBrokers];
+                                   final Properties brokerConfig,
+                                   final Map<String, String> clientConfigs) {
+        addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers);
+        try {
+            KafkaClusterTestKit.Builder clusterBuilder = new 
KafkaClusterTestKit.Builder(
+                    new TestKitNodes.Builder()
+                            .setCoResident(true)
+                            .setNumBrokerNodes(numBrokers)
+                            .setNumControllerNodes(numBrokers)
+                            .build()
+            );
+
+            brokerConfig.forEach((k, v) -> 
clusterBuilder.setConfigProp((String) k, v));
+            cluster = clusterBuilder.build();
+            cluster.nonFatalFaultHandler().setIgnore(true);
+        } catch (Exception e) {
+            throw new ConnectException("Failed to create test Kafka cluster", 
e);
+        }
         this.brokerConfig = brokerConfig;
-        // Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we 
track whether
-        // a listener config is defined during initialization in order to know 
if it's
-        // safe to override it
-        hasListenerConfig = brokerConfig.get(KafkaConfig.ListenersProp()) != 
null;
-
         this.clientConfigs = clientConfigs;
     }
 
-    /**
-     * Starts the Kafka cluster alone using the ports that were assigned 
during initialization of
-     * the harness.
-     *
-     * @throws ConnectException if a directory to store the data cannot be 
created
-     */
-    public void startOnlyKafkaOnSamePorts() {
-        doStart();
-    }
-
     public void start() {
-        // pick a random port
-        zookeeper = new EmbeddedZookeeper();
-        Arrays.fill(currentBrokerPorts, 0);
-        Arrays.fill(currentBrokerLogDirs, null);
-        doStart();
-    }
-
-    private void doStart() {
-        brokerConfig.put(KafkaConfig.ZkConnectProp(), zKConnectString());
-
-        putIfAbsent(brokerConfig, KafkaConfig.DeleteTopicEnableProp(), true);
-        putIfAbsent(brokerConfig, 
KafkaConfig.GroupInitialRebalanceDelayMsProp(), 0);
-        putIfAbsent(brokerConfig, 
KafkaConfig.OffsetsTopicReplicationFactorProp(), (short) brokers.length);
-        putIfAbsent(brokerConfig, KafkaConfig.AutoCreateTopicsEnableProp(), 
false);
-        // reduce the size of the log cleaner map to reduce test memory usage
-        putIfAbsent(brokerConfig, 
KafkaConfig.LogCleanerDedupeBufferSizeProp(), 2 * 1024 * 1024L);

Review Comment:
   It's handled here - 
https://github.com/apache/kafka/blob/f79c2a6e04129832eacec60bbc180173ecc37549/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java#L192-L193



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##########
@@ -187,120 +146,73 @@ private void doStart() {
         producer = new KafkaProducer<>(producerProps, new 
ByteArraySerializer(), new ByteArraySerializer());
     }
 
-    public void stopOnlyKafka() {
-        stop(false, false);
-    }
-
-    public void stop() {
-        stop(true, true);
-    }
-
-    private void stop(boolean deleteLogDirs, boolean stopZK) {
-        try {
-            if (producer != null) {
-                producer.close();
-            }
-        } catch (Exception e) {
-            log.error("Could not shutdown producer ", e);
-            throw new RuntimeException("Could not shutdown producer", e);
-        }
-
-        for (KafkaServer broker : brokers) {
-            try {
-                broker.shutdown();
-            } catch (Throwable t) {
-                String msg = String.format("Could not shutdown broker at %s", 
address(broker));
-                log.error(msg, t);
-                throw new RuntimeException(msg, t);
-            }
-        }
-
-        if (deleteLogDirs) {
-            for (KafkaServer broker : brokers) {
-                try {
-                    log.info("Cleaning up kafka log dirs at {}", 
broker.config().logDirs());
-                    CoreUtils.delete(broker.config().logDirs());
-                } catch (Throwable t) {
-                    String msg = String.format("Could not clean up log dirs 
for broker at %s",
-                            address(broker));
-                    log.error(msg, t);
-                    throw new RuntimeException(msg, t);
-                }
-            }
-        }
-
-        try {
-            if (stopZK) {
-                zookeeper.shutdown();
-            }
-        } catch (Throwable t) {
-            String msg = String.format("Could not shutdown zookeeper at %s", 
zKConnectString());
-            log.error(msg, t);
-            throw new RuntimeException(msg, t);
+    /**
+     * Restarts the Kafka brokers. This can be called after {@link 
#stopOnlyBrokers()}. Note that if the Kafka brokers
+     * need to be listening on the same ports as earlier, the {@link 
#brokerConfig} should contain the
+     * {@link KafkaConfig#ListenersProp} property and it should use a fixed 
non-zero free port.
+     */
+    public void restartOnlyBrokers() {
+        for (BrokerServer broker : cluster.brokers().values()) {
+            broker.startup();
         }
     }
 
-    private static void putIfAbsent(final Properties props, final String 
propertyKey, final Object propertyValue) {
-        if (!props.containsKey(propertyKey)) {
-            props.put(propertyKey, propertyValue);
+    /**
+     * Stop only the Kafka brokers (and not the KRaft controllers). This can 
be used to test Connect's functionality
+     * when the backing Kafka cluster goes offline.
+     */
+    public void stopOnlyBrokers() {
+        for (BrokerServer broker : cluster.brokers().values()) {
+            broker.shutdown();
+            broker.awaitShutdown();

Review Comment:
   Ah yeah, that's a great point. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to