yashmayya commented on code in PR #13375: URL: https://github.com/apache/kafka/pull/13375#discussion_r1144234357
########## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java: ########## @@ -152,11 +154,6 @@ public void stop() { connectCluster.forEach(this::stopWorker); try { kafkaCluster.stop(); - } catch (UngracefulShutdownException e) { - log.warn("Kafka did not shutdown gracefully"); Review Comment: It's being handled here - https://github.com/yashmayya/kafka/blob/f0f4bb9a30ed3fcb4692f16185df0dce4a032efd/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java#L175-L179 ########## core/src/test/java/kafka/testkit/KafkaClusterTestKit.java: ########## @@ -174,13 +174,17 @@ private KafkaConfig createNodeConfig(TestKitNode node) { props.put(KafkaConfig$.MODULE$.LogDirsProp(), String.join(",", brokerNode.logDataDirectories())); } - props.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(), - "EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT"); - props.put(KafkaConfig$.MODULE$.ListenersProp(), listeners(node.id())); - props.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(), - nodes.interBrokerListenerName().value()); - props.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(), - "CONTROLLER"); + + // listeners could be defined via Builder::setConfigProp which shouldn't be overridden + if (!props.containsKey(KafkaConfig$.MODULE$.ListenersProp())) { Review Comment: Hm that's a good point and I did ponder over this one as well. The alternative would be to parse the user defined listener here and make sure that either the security protocol map, inter broker listener, controller listener are all set appropriately or else do so ourselves - this would involve making sure that the protocols, SASL mechanisms etc. all match up properly which seems potentially error prone (I tried a setup with broker listeners and controller listeners using different security protocols or mechanisms and I wasn't able to get it to work - not sure whether something like that is even currently supported). I'm assuming that users would likely want to set custom listeners in case they want non-plaintext listeners at which point it isn't much extra work to also setup the controller listener accordingly (and there's multiple examples on how to do so in various integration tests). What do you think? ########## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java: ########## @@ -195,7 +197,22 @@ public void testRestartFailedTask() throws Exception { public void testBrokerCoordinator() throws Exception { ConnectorHandle connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME); workerProps.put(DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, String.valueOf(5000)); - connect = connectBuilder.workerProps(workerProps).build(); + Properties brokerProps = new Properties(); + + // Find a free port and use it in the Kafka broker's listeners config. We can't use port 0 in the listeners + // config to get a random free port because in this test we want to stop the Kafka broker and then bring it + // back up and listening on the same port in order to verify that the Connect cluster can re-connect to Kafka + // and continue functioning normally. If we were to use port 0 here, the Kafka broker would most likely listen + // on a different random free port the second time it is started. Review Comment: Sorry, my bad for not clarifying at the outset. There's a couple of reasons - one is that the previous implementation seemed fairly messy where we were checking the bound ports after broker startup and then modifying the broker config and using the static port in the listener configuration on broker restarts only if the user hadn't already configured listener configs. So, for instance, this `startOnlyKafkaOnSamePorts` would not work as expected if the user defines a SASL or SSL listener with port 0. The other reason is that the `KafkaClusterTestKit` being leveraged here would need some refactors to allow changing broker configs after it has already been instantiated - currently, it only supports customising broker configs in its builder. All in all, it just seems a lot cleaner to move the responsibility of using a fixed port in the listeners configuration to the clients of the embedded Kafka cluster in case they want to test functionality involving offline brokers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org