This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch kafka-6711-GlobalStateManagerImpl-no-checkpoint-in-memory in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 0bbdb80c4dc87557ff5e52ca8624d15568f8ba41 Author: Cemo <[email protected]> AuthorDate: Wed Mar 28 20:14:41 2018 +0300 KAFKA-6711: Add control for restoring in memory buckets --- .../kafka/streams/processor/internals/GlobalStateManagerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index 7c5c874..fac1526 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -246,7 +246,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob for (final TopicPartition topicPartition : topicPartitions) { globalConsumer.assign(Collections.singletonList(topicPartition)); final Long checkpoint = checkpointableOffsets.get(topicPartition); - if (checkpoint != null) { + if (checkpoint != null && checkpoint > StateRestorer.NO_CHECKPOINT) { globalConsumer.seek(topicPartition, checkpoint); } else { globalConsumer.seekToBeginning(Collections.singletonList(topicPartition)); -- To stop receiving notification emails like this one, please contact [email protected].
