[
https://issues.apache.org/jira/browse/FLINK-2974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15000355#comment-15000355
]
ASF GitHub Bot commented on FLINK-2974:
---------------------------------------
Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/1341#discussion_r44531013
--- Diff:
flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
---
@@ -374,12 +376,32 @@ public void open(Configuration parameters) throws
Exception {
// no restore request. Let the offset handler take care
of the initial offset seeking
offsetHandler.seekFetcherToInitialOffsets(subscribedPartitions, fetcher);
}
+
+ // check whether we need to start the periodic checkpoint
committer
+ StreamingRuntimeContext streamingRuntimeContext =
(StreamingRuntimeContext) getRuntimeContext();
+ if(!streamingRuntimeContext.isCheckpointingEnabled()) {
+ // we use Kafka's own configuration parameter key for
this.
+ // Note that the default configuration value in Kafka
is 60 * 1000, so we use the
+ // same here.
+ long commitInterval =
Long.valueOf(props.getProperty("auto.commit.interval.ms", "60000"));
+ offsetCommitter = new
PeriodicOffsetCommitter(commitInterval, this);
+ offsetCommitter.start();
+ LOG.info("Starting periodic offset committer, with
commit interval of {}ms", commitInterval);
+ }
}
@Override
public void run(SourceContext<T> sourceContext) throws Exception {
if (fetcher != null) {
- fetcher.run(sourceContext, valueDeserializer,
lastOffsets);
+ // by default, we use the checkpoint lock for updating
the state
+ Object stateUpdateLock =
sourceContext.getCheckpointLock();
+
+ // if checkpointing is disabled, we use the checkpoint
committer's lock object
+ StreamingRuntimeContext streamingRuntimeContext =
(StreamingRuntimeContext) getRuntimeContext();
+ if(!streamingRuntimeContext.isCheckpointingEnabled()) {
--- End diff --
code style, space
> Add periodic offset commit to Kafka Consumer if checkpointing is disabled
> -------------------------------------------------------------------------
>
> Key: FLINK-2974
> URL: https://issues.apache.org/jira/browse/FLINK-2974
> Project: Flink
> Issue Type: Improvement
> Components: Kafka Connector
> Reporter: Robert Metzger
>
> Flink only writes the offsets from the consumer into ZK if checkpointing is
> enabled.
> We should have a similar feature to Kafka's autocommit in our consumer.
> Issue reported by user:
> http://stackoverflow.com/questions/33501574/flink-kafka-why-am-i-losing-messages
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)