yashmayya commented on code in PR #13375: URL: https://github.com/apache/kafka/pull/13375#discussion_r1147210852
########## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java: ########## @@ -656,6 +567,19 @@ public KafkaProducer<byte[], byte[]> createProducer(Map<String, Object> producer return producer; } + private void addDefaultBrokerPropsIfAbsent(Properties brokerConfig, int numBrokers) { + putIfAbsent(brokerConfig, KafkaConfig.DeleteTopicEnableProp(), "true"); + putIfAbsent(brokerConfig, KafkaConfig.GroupInitialRebalanceDelayMsProp(), "0"); + putIfAbsent(brokerConfig, KafkaConfig.OffsetsTopicReplicationFactorProp(), String.valueOf(numBrokers)); + putIfAbsent(brokerConfig, KafkaConfig.AutoCreateTopicsEnableProp(), "false"); + } + + private static void putIfAbsent(final Properties props, final String propertyKey, final Object propertyValue) { + if (!props.containsKey(propertyKey)) { + props.put(propertyKey, propertyValue); + } + } Review Comment: Ah whoops, my bad! I can go ahead and clean up the map variant in this PR as well if you'd like? ########## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java: ########## @@ -89,96 +82,62 @@ import static org.junit.Assert.assertFalse; /** - * Setup an embedded Kafka cluster with specified number of brokers and specified broker properties. To be used for - * integration tests. + * Setup an embedded Kafka KRaft cluster (using {@link kafka.testkit.KafkaClusterTestKit} internally) with the + * specified number of brokers and the specified broker properties. This can be used for integration tests and is + * typically used in conjunction with {@link EmbeddedConnectCluster}. Additional Kafka client properties can also be + * supplied if required. This class also provides various utility methods to easily create Kafka topics, produce data, + * consume data etc. */ public class EmbeddedKafkaCluster { private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class); - private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120); + private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120); - // Kafka Config - private final KafkaServer[] brokers; + private final KafkaClusterTestKit cluster; private final Properties brokerConfig; - private final Time time = new MockTime(); - private final int[] currentBrokerPorts; - private final String[] currentBrokerLogDirs; - private final boolean hasListenerConfig; - - final Map<String, String> clientConfigs; - - private EmbeddedZookeeper zookeeper = null; - private ListenerName listenerName = new ListenerName("PLAINTEXT"); + private final Map<String, String> clientConfigs; private KafkaProducer<byte[], byte[]> producer; - public EmbeddedKafkaCluster(final int numBrokers, - final Properties brokerConfig) { + public EmbeddedKafkaCluster(final int numBrokers, final Properties brokerConfig) { this(numBrokers, brokerConfig, Collections.emptyMap()); } public EmbeddedKafkaCluster(final int numBrokers, - final Properties brokerConfig, - final Map<String, String> clientConfigs) { - brokers = new KafkaServer[numBrokers]; - currentBrokerPorts = new int[numBrokers]; - currentBrokerLogDirs = new String[numBrokers]; + final Properties brokerConfig, + final Map<String, String> clientConfigs) { + addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers); + try { + KafkaClusterTestKit.Builder clusterBuilder = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder() + .setCoResident(true) + .setNumBrokerNodes(numBrokers) + .setNumControllerNodes(numBrokers) + .build() + ); + + brokerConfig.forEach((k, v) -> clusterBuilder.setConfigProp((String) k, v)); + cluster = clusterBuilder.build(); + cluster.nonFatalFaultHandler().setIgnore(true); + } catch (Exception e) { + throw new ConnectException("Failed to create test Kafka cluster", e); + } this.brokerConfig = brokerConfig; - // Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we track whether - // a listener config is defined during initialization in order to know if it's - // safe to override it - hasListenerConfig = brokerConfig.get(KafkaConfig.ListenersProp()) != null; - this.clientConfigs = clientConfigs; } - /** - * Starts the Kafka cluster alone using the ports that were assigned during initialization of - * the harness. - * - * @throws ConnectException if a directory to store the data cannot be created - */ - public void startOnlyKafkaOnSamePorts() { - doStart(); - } - public void start() { - // pick a random port - zookeeper = new EmbeddedZookeeper(); - Arrays.fill(currentBrokerPorts, 0); - Arrays.fill(currentBrokerLogDirs, null); - doStart(); - } - - private void doStart() { - brokerConfig.put(KafkaConfig.ZkConnectProp(), zKConnectString()); - - putIfAbsent(brokerConfig, KafkaConfig.DeleteTopicEnableProp(), true); - putIfAbsent(brokerConfig, KafkaConfig.GroupInitialRebalanceDelayMsProp(), 0); - putIfAbsent(brokerConfig, KafkaConfig.OffsetsTopicReplicationFactorProp(), (short) brokers.length); - putIfAbsent(brokerConfig, KafkaConfig.AutoCreateTopicsEnableProp(), false); - // reduce the size of the log cleaner map to reduce test memory usage - putIfAbsent(brokerConfig, KafkaConfig.LogCleanerDedupeBufferSizeProp(), 2 * 1024 * 1024L); Review Comment: It's handled here - https://github.com/apache/kafka/blob/f79c2a6e04129832eacec60bbc180173ecc37549/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java#L192-L193 ########## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java: ########## @@ -187,120 +146,73 @@ private void doStart() { producer = new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer()); } - public void stopOnlyKafka() { - stop(false, false); - } - - public void stop() { - stop(true, true); - } - - private void stop(boolean deleteLogDirs, boolean stopZK) { - try { - if (producer != null) { - producer.close(); - } - } catch (Exception e) { - log.error("Could not shutdown producer ", e); - throw new RuntimeException("Could not shutdown producer", e); - } - - for (KafkaServer broker : brokers) { - try { - broker.shutdown(); - } catch (Throwable t) { - String msg = String.format("Could not shutdown broker at %s", address(broker)); - log.error(msg, t); - throw new RuntimeException(msg, t); - } - } - - if (deleteLogDirs) { - for (KafkaServer broker : brokers) { - try { - log.info("Cleaning up kafka log dirs at {}", broker.config().logDirs()); - CoreUtils.delete(broker.config().logDirs()); - } catch (Throwable t) { - String msg = String.format("Could not clean up log dirs for broker at %s", - address(broker)); - log.error(msg, t); - throw new RuntimeException(msg, t); - } - } - } - - try { - if (stopZK) { - zookeeper.shutdown(); - } - } catch (Throwable t) { - String msg = String.format("Could not shutdown zookeeper at %s", zKConnectString()); - log.error(msg, t); - throw new RuntimeException(msg, t); + /** + * Restarts the Kafka brokers. This can be called after {@link #stopOnlyBrokers()}. Note that if the Kafka brokers + * need to be listening on the same ports as earlier, the {@link #brokerConfig} should contain the + * {@link KafkaConfig#ListenersProp} property and it should use a fixed non-zero free port. + */ + public void restartOnlyBrokers() { + for (BrokerServer broker : cluster.brokers().values()) { + broker.startup(); } } - private static void putIfAbsent(final Properties props, final String propertyKey, final Object propertyValue) { - if (!props.containsKey(propertyKey)) { - props.put(propertyKey, propertyValue); + /** + * Stop only the Kafka brokers (and not the KRaft controllers). This can be used to test Connect's functionality + * when the backing Kafka cluster goes offline. + */ + public void stopOnlyBrokers() { + for (BrokerServer broker : cluster.brokers().values()) { + broker.shutdown(); + broker.awaitShutdown(); Review Comment: Ah yeah, that's a great point. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org