AHeise commented on code in PR #126:
URL: 
https://github.com/apache/flink-connector-kafka/pull/126#discussion_r1792410015


##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java:
##########
@@ -1246,8 +1230,18 @@ private Set<String> generateNewTransactionalIds() {
     }
 
     @Override
-    protected void finishRecoveringContext(
-            Collection<FlinkKafkaProducer.KafkaTransactionState> 
handledTransactions) {
+    protected void finishProcessing(@Nullable KafkaTransactionState 
transaction) {
+        super.finishProcessing(transaction);
+        // TwoPhaseCommitSink sets transaction = null on final checkpoint and 
thus closing will leak
+        // the producer. For transactional producers, we track the producer in 
pendingTransactions.
+        if (transaction != null && !transaction.isTransactional()) {
+            transaction.producer.flush();
+            transaction.producer.close(Duration.ZERO);
+        }
+    }

Review Comment:
   > I feel uncomfortable merging this change as part of a "fix-tests-ticket". 
   
   I rephrased the ticket. The PR never claimed to be test-only and it fits the 
theme. We basically roll out the leak detection to all test and fix what we 
find.
   
   > This is a change in production code to more or less support 
checkpoints-after-task-finish for the deprecated sink function.
   
   No this only fixes the leak that makes the tests run stable. The only reason 
why tests passed at all is because we rerun them by default a couple of times. 
The only plausible alternative to fixing the leaks is to remove the leak 
detection altogether.
   
   Also can you elaborate why fixing a deprecated but still supported feature 
causes concerns? We do have at least 2 releases depending on it and we 
shouldn't compromise on stable CI imho.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to