[ 
https://issues.apache.org/jira/browse/FLINK-2974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15000350#comment-15000350
 ] 

ASF GitHub Bot commented on FLINK-2974:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1341#discussion_r44530755
  
    --- Diff: 
flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
 ---
    @@ -567,6 +604,75 @@ public void notifyCheckpointComplete(long 
checkpointId) throws Exception {
                }
                return partitionsToSub;
        }
    +
    +   /**
    +    * Thread to periodically commit the current read offset into Zookeeper.
    +    */
    +   private static class PeriodicOffsetCommitter extends Thread {
    +           private long commitInterval;
    +           private volatile boolean running = true;
    +           private FlinkKafkaConsumer consumer;
    +           private final Object stateUpdateLock = new Object();
    +
    +           public PeriodicOffsetCommitter(long commitInterval, 
FlinkKafkaConsumer consumer) {
    +                   this.commitInterval = commitInterval;
    +                   this.consumer = consumer;
    +           }
    +
    +           @Override
    +           public void run() {
    +                   try {
    +                           while (running) {
    +                                   try {
    +                                           Thread.sleep(commitInterval);
    +
    +                                           //  ------------  commit 
current offsets ----------------
    +
    +                                           // create copy of current 
offsets
    +                                           long[] currentOffsets;
    +                                           synchronized (stateUpdateLock) {
    +                                                   currentOffsets = 
Arrays.copyOf(consumer.lastOffsets, consumer.lastOffsets.length);
    +                                           }
    +
    +                                           Map<TopicPartition, Long> 
offsetsToCommit = new HashMap<>();
    +                                           //noinspection unchecked
    +                                           for (TopicPartition tp : 
(List<TopicPartition>)consumer.subscribedPartitions) {
    +                                                   int partition = 
tp.partition();
    +                                                   long offset = 
currentOffsets[partition];
    +                                                   long lastCommitted = 
consumer.commitedOffsets[partition];
    +
    +                                                   if (offset != 
OFFSET_NOT_SET) {
    +                                                           if (offset > 
lastCommitted) {
    +                                                                   
offsetsToCommit.put(tp, offset);
    +                                                                   
LOG.debug("Committing offset {} for partition {}", offset, partition);
    +                                                           } else {
    +                                                                   
LOG.debug("Ignoring offset {} for partition {} because it is already 
committed", offset, partition);
    +                                                           }
    +                                                   }
    +                                           }
    +
    +                                           
consumer.offsetHandler.commit(offsetsToCommit);
    +                                   } catch (InterruptedException e) {
    +                                           // looks like the thread is 
being closed. Leave loop
    +                                           break;
    +                                   }
    +                           }
    +                   } catch(Throwable t) {
    +                           LOG.warn("Periodic checkpoint committer is 
stopping the fetcher because of an error", t);
    +                           consumer.fetcher.stopWithError(t);
    +                   }
    +           }
    +
    +           public void close() {
    +                   running = false;
    +                   // interrupt sleep
    +                   Thread.currentThread().interrupt();
    --- End diff --
    
    This interrupts the wrong thread, namely the one that calls close() rather 
then the offset committing thread.


> Add periodic offset commit to Kafka Consumer if checkpointing is disabled
> -------------------------------------------------------------------------
>
>                 Key: FLINK-2974
>                 URL: https://issues.apache.org/jira/browse/FLINK-2974
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>            Reporter: Robert Metzger
>
> Flink only writes the offsets from the consumer into ZK if checkpointing is 
> enabled.
> We should have a similar feature to Kafka's autocommit in our consumer.
> Issue reported by user: 
> http://stackoverflow.com/questions/33501574/flink-kafka-why-am-i-losing-messages



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to