Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2108#discussion_r68085762
  
    --- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
 ---
    @@ -113,6 +123,14 @@
        /** Errors encountered in the async producer are stored here */
        protected transient volatile Exception asyncException;
     
    +   /**
    +    * Number of unacknowledged records.
    +    * There is no need to introduce additional locks because invoke() and 
snapshotState() are
    +    * never called concurrently. So blocking the snapshotting will lock 
the invoke() method until all
    +    * pending records have been confirmed.
    +    */
    --- End diff --
    
    I think the fact that `invoke` and `snapshotState` are mutually exclusive 
is not important for the semantics of the `pendingRecords` variable. The reason 
is that it will only be incremented in `invoke` and decremented in the 
`callbacks` of the Kafka producer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to