[
https://issues.apache.org/jira/browse/FLINK-2974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15000354#comment-15000354
]
ASF GitHub Bot commented on FLINK-2974:
---------------------------------------
Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/1341#discussion_r44530920
--- Diff:
flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
---
@@ -567,6 +604,75 @@ public void notifyCheckpointComplete(long
checkpointId) throws Exception {
}
return partitionsToSub;
}
+
+ /**
+ * Thread to periodically commit the current read offset into Zookeeper.
+ */
+ private static class PeriodicOffsetCommitter extends Thread {
+ private long commitInterval;
+ private volatile boolean running = true;
+ private FlinkKafkaConsumer consumer;
+ private final Object stateUpdateLock = new Object();
+
+ public PeriodicOffsetCommitter(long commitInterval,
FlinkKafkaConsumer consumer) {
+ this.commitInterval = commitInterval;
+ this.consumer = consumer;
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (running) {
+ try {
+ Thread.sleep(commitInterval);
+
+ // ------------ commit
current offsets ----------------
+
+ // create copy of current
offsets
+ long[] currentOffsets;
+ synchronized (stateUpdateLock) {
+ currentOffsets =
Arrays.copyOf(consumer.lastOffsets, consumer.lastOffsets.length);
+ }
+
+ Map<TopicPartition, Long>
offsetsToCommit = new HashMap<>();
+ //noinspection unchecked
+ for (TopicPartition tp :
(List<TopicPartition>)consumer.subscribedPartitions) {
+ int partition =
tp.partition();
+ long offset =
currentOffsets[partition];
+ long lastCommitted =
consumer.commitedOffsets[partition];
+
+ if (offset !=
OFFSET_NOT_SET) {
+ if (offset >
lastCommitted) {
+
offsetsToCommit.put(tp, offset);
+
LOG.debug("Committing offset {} for partition {}", offset, partition);
+ } else {
+
LOG.debug("Ignoring offset {} for partition {} because it is already
committed", offset, partition);
+ }
+ }
+ }
+
+
consumer.offsetHandler.commit(offsetsToCommit);
+ } catch (InterruptedException e) {
+ // looks like the thread is
being closed. Leave loop
--- End diff --
Good style is to check whether the `running` flag is still true. If yes,
throw an error, it not, break. Catches the fact when this is interrupted
without shutdown, and you want the fetcher to learn about this.
> Add periodic offset commit to Kafka Consumer if checkpointing is disabled
> -------------------------------------------------------------------------
>
> Key: FLINK-2974
> URL: https://issues.apache.org/jira/browse/FLINK-2974
> Project: Flink
> Issue Type: Improvement
> Components: Kafka Connector
> Reporter: Robert Metzger
>
> Flink only writes the offsets from the consumer into ZK if checkpointing is
> enabled.
> We should have a similar feature to Kafka's autocommit in our consumer.
> Issue reported by user:
> http://stackoverflow.com/questions/33501574/flink-kafka-why-am-i-losing-messages
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)