[
https://issues.apache.org/jira/browse/FLINK-4027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333260#comment-15333260
]
ASF GitHub Bot commented on FLINK-4027:
---------------------------------------
Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/flink/pull/2108#discussion_r67296678
--- Diff:
flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
---
@@ -276,6 +314,41 @@ public void close() throws Exception {
checkErroneous();
}
+ // ------------------- Logic for handling checkpoint flushing
-------------------------- //
+
+ private void acknowledgeMessage() {
+ if(!flushOnCheckpoint) {
+ // the logic is disabled
+ return;
+ }
+ pendingRecords--;
+ }
+
+ @Override
+ public Serializable snapshotState(long checkpointId, long
checkpointTimestamp) {
+ if(flushOnCheckpoint) {
+ // flushing is activated: We need to wait until
pendingRecords is 0
+ while(pendingRecords > 0) {
+ try {
+ Thread.sleep(10);
--- End diff --
The problem is that the `flush()` method is only implemented by the Kafka
0.9 producer, not by the 0.8 implementation.
As you can see from the classname, its the shared base class between the
two version specific implementations. I think for the 0.8 producer, there is no
way around the waiting approach.
I'll update the pull request to call `flush()` on the 0.9 producer.
> FlinkKafkaProducer09 sink can lose messages
> -------------------------------------------
>
> Key: FLINK-4027
> URL: https://issues.apache.org/jira/browse/FLINK-4027
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector
> Affects Versions: 1.0.3
> Reporter: Elias Levy
> Assignee: Robert Metzger
> Priority: Critical
>
> The FlinkKafkaProducer09 sink appears to not offer at-least-once guarantees.
> The producer is publishing messages asynchronously. A callback can record
> publishing errors, which will be raised when detected. But as far as I can
> tell, there is no barrier to wait for async errors from the sink when
> checkpointing or to track the event time of acked messages to inform the
> checkpointing process.
> If a checkpoint occurs while there are pending publish requests, and the
> requests return a failure after the checkpoint occurred, those message will
> be lost as the checkpoint will consider them processed by the sink.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)