cmccabe commented on code in PR #12837: URL: https://github.com/apache/kafka/pull/12837#discussion_r1022030312
########## core/src/test/java/kafka/testkit/KafkaClusterTestKit.java: ########## @@ -159,53 +218,30 @@ public KafkaClusterTestKit build() throws Exception { executorService = Executors.newFixedThreadPool(numOfExecutorThreads, ThreadUtils.createThreadFactory("KafkaClusterTestKit%d", false)); for (ControllerNode node : nodes.controllerNodes().values()) { - Map<String, String> props = new HashMap<>(configProps); - props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), roles(node.id())); - props.put(KafkaConfig$.MODULE$.NodeIdProp(), - Integer.toString(node.id())); - props.put(KafkaConfig$.MODULE$.MetadataLogDirProp(), - node.metadataDirectory()); - props.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(), - "EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT"); - props.put(KafkaConfig$.MODULE$.ListenersProp(), listeners(node.id())); - props.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(), - nodes.interBrokerListenerName().value()); - props.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(), - "CONTROLLER"); - // Note: we can't accurately set controller.quorum.voters yet, since we don't - // yet know what ports each controller will pick. Set it to a dummy string \ - // for now as a placeholder. - props.put(RaftConfig.QUORUM_VOTERS_CONFIG, uninitializedQuorumVotersString); - - // reduce log cleaner offset map memory usage - props.put(KafkaConfig$.MODULE$.LogCleanerDedupeBufferSizeProp(), "2097152"); - setupNodeDirectories(baseDirectory, node.metadataDirectory(), Collections.emptyList()); - KafkaConfig config = new KafkaConfig(props, false, Option.empty()); - - String threadNamePrefix = String.format("controller%d_", node.id()); - MetaProperties metaProperties = MetaProperties.apply(nodes.clusterId().toString(), node.id()); - TopicPartition metadataPartition = new TopicPartition(KafkaRaftServer.MetadataTopic(), 0); BootstrapMetadata bootstrapMetadata = BootstrapMetadata. fromVersion(nodes.bootstrapMetadataVersion(), "testkit"); - KafkaRaftManager<ApiMessageAndVersion> raftManager = new KafkaRaftManager<>( - metaProperties, config, new MetadataRecordSerde(), metadataPartition, KafkaRaftServer.MetadataTopicId(), - Time.SYSTEM, new Metrics(), Option.apply(threadNamePrefix), connectFutureManager.future); - ControllerServer controller = new ControllerServer( - nodes.controllerProperties(node.id()), - config, - raftManager, - Time.SYSTEM, - new Metrics(), - new MockControllerMetrics(), - Option.apply(threadNamePrefix), - connectFutureManager.future, - KafkaRaftServer.configSchema(), - raftManager.apiVersions(), - bootstrapMetadata, - metadataFaultHandler, - fatalFaultHandler - ); + String threadNamePrefix = (nodes.brokerNodes().containsKey(node.id())) ? + String.format("colocated%d", node.id()) : + String.format("controller%d", node.id()); + JointServer jointServer = new JointServer(createNodeConfig(node), Review Comment: It's not simple at all to use KafkaRaftServer in most of our tests. Let me give an example. If someone shuts down a broker in a test by calling BrokerServer#shutdown, and the broker was a standalone broker, you have to somehow shut down the associated KafkaRaftServer, the associated snapshot generator, and the associated metadata loader. And clear the associated dynamic metadata. If you maintain KafkaRaftServer more or less the way it is, where it owns a BrokerServer, ControllerServer, and some other stuff, and those owned objects don't have any pointers back to it, this is not really possible. You would have to either rewrite the tests in terms of KafkaRaftServer, which is not really feasible in the time we have available, or just accept that BrokerServer#shutdown is not going to clean up everything. I don't think either course of action really works here. In general we haven't tested combined mode very much, so we've been able to handwave some of this. Or just accept resource leaks in the tests. But to do it correctly, we should acknowledge that in combined mode there is some joint state. Hence, JointServer. I think this PR greatly simplifies the test code (and will do so for the other test harnesses we have). We cannot have each test harness manually managing the joint state, it is just too much (and grows over time). This is a clean way to do that: standalone mode = your own JointServer, combined mode = shared JointServer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org