divijvaidya commented on code in PR #12228:
URL: https://github.com/apache/kafka/pull/12228#discussion_r952807299


##########
clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java:
##########
@@ -54,12 +54,18 @@ public void configure(Map<String, ?> configs) throws 
KafkaException {
     @Override
     public KafkaChannel buildChannel(String id, SelectionKey key, int 
maxReceiveSize,
                                      MemoryPool memoryPool, 
ChannelMetadataRegistry metadataRegistry) throws KafkaException {
+        PlaintextTransportLayer transportLayer = null;
         try {
-            PlaintextTransportLayer transportLayer = buildTransportLayer(key);
-            Supplier<Authenticator> authenticatorCreator = () -> new 
PlaintextAuthenticator(configs, transportLayer, listenerName);
+            transportLayer = buildTransportLayer(key);
+            final PlaintextTransportLayer finalTransportLayer = transportLayer;
+            Supplier<Authenticator> authenticatorCreator = () -> new 
PlaintextAuthenticator(configs, finalTransportLayer, listenerName);
             return buildChannel(id, transportLayer, authenticatorCreator, 
maxReceiveSize,
                     memoryPool != null ? memoryPool : MemoryPool.NONE, 
metadataRegistry);
         } catch (Exception e) {
+            // Ideally these resources are closed by the KafkaChannel but this 
builder should close the resources instead
+            // if an error occurs due to which KafkaChannel is not created.
+            Utils.closeQuietly(transportLayer, "transport layer for channel 
Id: " + id);
+            Utils.closeQuietly(metadataRegistry, "metadataRegistry");

Review Comment:
   > Shouldn't that take care of closing the metadata registry since it 
actually creates it?
   
   I agree, I missed it earlier. I have moved the responsibility of closing in 
case of errors to `Selector.buildAndAttachKafkaChannel`. I have also added a 
unit test that fails before this fix and passes afterwards.
   
   > With regards to the transport layer, I wonder if we could move the cleanup 
logic to `KafkaChannel` instead
   
   I am afraid that might not be possible because we are handling scenarios 
where we encounter an error while creating the `KafkaChannel` itself (e.g. a 
scenario where authenticatorCreator throws an error in the constructor). We can 
close it uniformly at `ChannelBuilder` but that would require a bit of 
refactoring involving conversion of `ChannelBuiler` interface into a class. 
Please let me know if you have any ideas.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to