cmccabe commented on code in PR #12837:
URL: https://github.com/apache/kafka/pull/12837#discussion_r1022030312


##########
core/src/test/java/kafka/testkit/KafkaClusterTestKit.java:
##########
@@ -159,53 +218,30 @@ public KafkaClusterTestKit build() throws Exception {
                 executorService = 
Executors.newFixedThreadPool(numOfExecutorThreads,
                     ThreadUtils.createThreadFactory("KafkaClusterTestKit%d", 
false));
                 for (ControllerNode node : nodes.controllerNodes().values()) {
-                    Map<String, String> props = new HashMap<>(configProps);
-                    props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), 
roles(node.id()));
-                    props.put(KafkaConfig$.MODULE$.NodeIdProp(),
-                        Integer.toString(node.id()));
-                    props.put(KafkaConfig$.MODULE$.MetadataLogDirProp(),
-                        node.metadataDirectory());
-                    
props.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(),
-                        "EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT");
-                    props.put(KafkaConfig$.MODULE$.ListenersProp(), 
listeners(node.id()));
-                    
props.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(),
-                        nodes.interBrokerListenerName().value());
-                    
props.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(),
-                        "CONTROLLER");
-                    // Note: we can't accurately set controller.quorum.voters 
yet, since we don't
-                    // yet know what ports each controller will pick.  Set it 
to a dummy string \
-                    // for now as a placeholder.
-                    props.put(RaftConfig.QUORUM_VOTERS_CONFIG, 
uninitializedQuorumVotersString);
-
-                    // reduce log cleaner offset map memory usage
-                    
props.put(KafkaConfig$.MODULE$.LogCleanerDedupeBufferSizeProp(), "2097152");
-
                     setupNodeDirectories(baseDirectory, 
node.metadataDirectory(), Collections.emptyList());
-                    KafkaConfig config = new KafkaConfig(props, false, 
Option.empty());
-
-                    String threadNamePrefix = String.format("controller%d_", 
node.id());
-                    MetaProperties metaProperties = 
MetaProperties.apply(nodes.clusterId().toString(), node.id());
-                    TopicPartition metadataPartition = new 
TopicPartition(KafkaRaftServer.MetadataTopic(), 0);
                     BootstrapMetadata bootstrapMetadata = BootstrapMetadata.
                         fromVersion(nodes.bootstrapMetadataVersion(), 
"testkit");
-                    KafkaRaftManager<ApiMessageAndVersion> raftManager = new 
KafkaRaftManager<>(
-                        metaProperties, config, new MetadataRecordSerde(), 
metadataPartition, KafkaRaftServer.MetadataTopicId(),
-                        Time.SYSTEM, new Metrics(), 
Option.apply(threadNamePrefix), connectFutureManager.future);
-                    ControllerServer controller = new ControllerServer(
-                        nodes.controllerProperties(node.id()),
-                        config,
-                        raftManager,
-                        Time.SYSTEM,
-                        new Metrics(),
-                        new MockControllerMetrics(),
-                        Option.apply(threadNamePrefix),
-                        connectFutureManager.future,
-                        KafkaRaftServer.configSchema(),
-                        raftManager.apiVersions(),
-                        bootstrapMetadata,
-                        metadataFaultHandler,
-                        fatalFaultHandler
-                    );
+                    String threadNamePrefix = 
(nodes.brokerNodes().containsKey(node.id())) ?
+                            String.format("colocated%d", node.id()) :
+                            String.format("controller%d", node.id());
+                    JointServer jointServer = new 
JointServer(createNodeConfig(node),

Review Comment:
   It's not simple at all to use KafkaRaftServer in most of our tests. Let me 
give an example. If someone shuts down a broker in a test by calling 
BrokerServer#shutdown, and the broker was a standalone broker, you have to 
somehow shut down the associated KafkaRaftServer, the associated snapshot 
generator, and the associated metadata loader. And clear the associated dynamic 
metadata.
   
   If you maintain KafkaRaftServer more or less the way it is, where it owns a 
BrokerServer, ControllerServer, and some other stuff, and those owned objects 
don't have any pointers back to it, this is not really possible. You would have 
to either rewrite the tests in terms of KafkaRaftServer, which is not really 
feasible in the time we have available, or just accept that 
BrokerServer#shutdown is not going to clean up everything. I don't think either 
course of action really works here.
   
   In general we haven't tested combined mode very much, so we've been able to 
handwave some of this. Or just accept resource leaks in the tests. But to do it 
correctly, we should acknowledge that in combined mode there is some joint 
state. Hence, JointServer.
   
   I think this PR greatly simplifies the test code (and will do so for the 
other test harnesses we have). We cannot have each test harness manually 
managing the joint state, it is just too much (and grows over time). This is a 
clean way to do that: standalone mode = your own JointServer, combined mode = 
shared JointServer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to