[ 
https://issues.apache.org/jira/browse/FLINK-2974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15016524#comment-15016524
 ] 

ASF GitHub Bot commented on FLINK-2974:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1341#discussion_r45461644
  
    --- Diff: 
flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
 ---
    @@ -374,12 +372,33 @@ public void open(Configuration parameters) throws 
Exception {
                        // no restore request. Let the offset handler take care 
of the initial offset seeking
                        
offsetHandler.seekFetcherToInitialOffsets(subscribedPartitions, fetcher);
                }
    +
    +
        }
     
        @Override
        public void run(SourceContext<T> sourceContext) throws Exception {
                if (fetcher != null) {
    +                   // For non-checkpointed sources, a thread which 
periodically commits the current offset into ZK.
    +                   PeriodicOffsetCommitter offsetCommitter = null;
    +
    +                   // check whether we need to start the periodic 
checkpoint committer
    +                   StreamingRuntimeContext streamingRuntimeContext = 
(StreamingRuntimeContext) getRuntimeContext();
    +                   if (!streamingRuntimeContext.isCheckpointingEnabled()) {
    +                           // we use Kafka's own configuration parameter 
key for this.
    +                           // Note that the default configuration value in 
Kafka is 60 * 1000, so we use the
    +                           // same here.
    +                           long commitInterval = 
Long.valueOf(props.getProperty("auto.commit.interval.ms", "60000"));
    +                           offsetCommitter = new 
PeriodicOffsetCommitter(commitInterval, this);
    +                           offsetCommitter.start();
    +                           LOG.info("Starting periodic offset committer, 
with commit interval of {}ms", commitInterval);
    +                   }
    +
                        fetcher.run(sourceContext, valueDeserializer, 
lastOffsets);
    +
    +                   if(offsetCommitter != null) {
    --- End diff --
    
    Style space ;-)


> Add periodic offset commit to Kafka Consumer if checkpointing is disabled
> -------------------------------------------------------------------------
>
>                 Key: FLINK-2974
>                 URL: https://issues.apache.org/jira/browse/FLINK-2974
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>            Reporter: Robert Metzger
>            Assignee: Robert Metzger
>
> Flink only writes the offsets from the consumer into ZK if checkpointing is 
> enabled.
> We should have a similar feature to Kafka's autocommit in our consumer.
> Issue reported by user: 
> http://stackoverflow.com/questions/33501574/flink-kafka-why-am-i-losing-messages



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to