becketqin commented on a change in pull request #12589:
URL: https://github.com/apache/flink/pull/12589#discussion_r439159451



##########
File path: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/kafka/KafkaStandaloneGenerator.java
##########
@@ -90,7 +91,7 @@ public void collect(Event evt) {
 
                @Override
                public void close() {
-                       producer.close();
+                       producer.close(Duration.ofSeconds(0));

Review comment:
       Should we also flush the producer here?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
##########
@@ -1182,7 +1208,7 @@ int getTransactionCoordinatorId() {
 
        private void 
recycleTransactionalProducer(FlinkKafkaInternalProducer<byte[], byte[]> 
producer) {
                availableTransactionalIds.add(producer.getTransactionalId());
-               producer.close();
+               producer.close(Duration.ofSeconds(0));

Review comment:
       Can we also flush the producer here? It probably does not matter at this 
point because this method is invoked either after committing or aborting 
transactions, but I just want to be safe here in case this method is called in 
other places in the future.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to