Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2108#discussion_r68085762
--- Diff:
flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
---
@@ -113,6 +123,14 @@
/** Errors encountered in the async producer are stored here */
protected transient volatile Exception asyncException;
+ /**
+ * Number of unacknowledged records.
+ * There is no need to introduce additional locks because invoke() and
snapshotState() are
+ * never called concurrently. So blocking the snapshotting will lock
the invoke() method until all
+ * pending records have been confirmed.
+ */
--- End diff --
I think the fact that `invoke` and `snapshotState` are mutually exclusive
is not important for the semantics of the `pendingRecords` variable. The reason
is that it will only be incremented in `invoke` and decremented in the
`callbacks` of the Kafka producer.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---