This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 
kafka-6711-GlobalStateManagerImpl-no-checkpoint-in-memory
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 0bbdb80c4dc87557ff5e52ca8624d15568f8ba41
Author: Cemo <[email protected]>
AuthorDate: Wed Mar 28 20:14:41 2018 +0300

    KAFKA-6711: Add control for restoring in memory buckets
---
 .../kafka/streams/processor/internals/GlobalStateManagerImpl.java       | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 7c5c874..fac1526 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -246,7 +246,7 @@ public class GlobalStateManagerImpl extends 
AbstractStateManager implements Glob
         for (final TopicPartition topicPartition : topicPartitions) {
             globalConsumer.assign(Collections.singletonList(topicPartition));
             final Long checkpoint = checkpointableOffsets.get(topicPartition);
-            if (checkpoint != null) {
+            if (checkpoint != null && checkpoint > 
StateRestorer.NO_CHECKPOINT) {
                 globalConsumer.seek(topicPartition, checkpoint);
             } else {
                 
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to