This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 
kafka-6711-GlobalStateManagerImpl-no-checkpoint-in-memory
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 1a349142cd6daf1dfeece02cc0553b1b7d82c2c8
Author: Matthias J. Sax <matth...@confluent.io>
AuthorDate: Wed Jun 13 16:30:19 2018 -0700

    Github comments
    Moved immutable part into constructor
    Don't write non-persistent partitions into checkpoint file
---
 .../internals/GlobalStateManagerImpl.java          | 23 +++++++++++-----------
 .../internals/GlobalStateManagerImplTest.java      |  4 ++--
 2 files changed, 13 insertions(+), 14 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index fac1526..05c82d4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -63,6 +63,7 @@ public class GlobalStateManagerImpl extends 
AbstractStateManager implements Glob
     private final int retries;
     private final long retryBackoffMs;
     private final Duration pollTime;
+    private final Set<String> globalNonPersistentStoresTopics = new 
HashSet<>();
 
     public GlobalStateManagerImpl(final LogContext logContext,
                                   final ProcessorTopology topology,
@@ -72,6 +73,14 @@ public class GlobalStateManagerImpl extends 
AbstractStateManager implements Glob
                                   final StreamsConfig config) {
         super(stateDirectory.globalStateDir(), 
StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)));
 
+        // Find non persistent store's topics
+        final Map<String, String> storeToChangelogTopic = 
topology.storeToChangelogTopic();
+        for (final StateStore store : topology.globalStateStores()) {
+            if (!store.persistent()) {
+                
globalNonPersistentStoresTopics.add(storeToChangelogTopic.get(store.name()));
+            }
+        }
+
         this.log = logContext.logger(GlobalStateManagerImpl.class);
         this.topology = topology;
         this.globalConsumer = globalConsumer;
@@ -246,7 +255,7 @@ public class GlobalStateManagerImpl extends 
AbstractStateManager implements Glob
         for (final TopicPartition topicPartition : topicPartitions) {
             globalConsumer.assign(Collections.singletonList(topicPartition));
             final Long checkpoint = checkpointableOffsets.get(topicPartition);
-            if (checkpoint != null && checkpoint > 
StateRestorer.NO_CHECKPOINT) {
+            if (checkpoint != null) {
                 globalConsumer.seek(topicPartition, checkpoint);
             } else {
                 
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
@@ -337,16 +346,6 @@ public class GlobalStateManagerImpl extends 
AbstractStateManager implements Glob
 
     @Override
     public void checkpoint(final Map<TopicPartition, Long> offsets) {
-
-        // Find non persistent store's topics
-        final Map<String, String> storeToChangelogTopic = 
topology.storeToChangelogTopic();
-        final Set<String> globalNonPersistentStoresTopics = new HashSet<>();
-        for (final StateStore store : topology.globalStateStores()) {
-            if (!store.persistent() && 
storeToChangelogTopic.containsKey(store.name())) {
-                
globalNonPersistentStoresTopics.add(storeToChangelogTopic.get(store.name()));
-            }
-        }
-
         checkpointableOffsets.putAll(offsets);
 
         final Map<TopicPartition, Long> filteredOffsets = new HashMap<>();
@@ -355,7 +354,7 @@ public class GlobalStateManagerImpl extends 
AbstractStateManager implements Glob
         for (final Map.Entry<TopicPartition, Long> topicPartitionOffset : 
checkpointableOffsets.entrySet()) {
             final String topic = topicPartitionOffset.getKey().topic();
             if (globalNonPersistentStoresTopics.contains(topic)) {
-                filteredOffsets.put(topicPartitionOffset.getKey(), (long) 
StateRestorer.NO_CHECKPOINT);
+                filteredOffsets.remove(topicPartitionOffset.getKey());
             } else {
                 filteredOffsets.put(topicPartitionOffset.getKey(), 
topicPartitionOffset.getValue());
             }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index 19af5f4..e37f6a6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -493,9 +493,9 @@ public class GlobalStateManagerImplTest {
         stateManager.initialize();
         initializeConsumer(10, 1, t3);
         stateManager.register(store3, stateRestoreCallback);
-        stateManager.close(Collections.<TopicPartition, Long>emptyMap());
+        stateManager.close(Collections.emptyMap());
 
-        assertThat(readOffsetsCheckpoint(), 
equalTo(Collections.singletonMap(t3, (long) StateRestorer.NO_CHECKPOINT)));
+        assertThat(readOffsetsCheckpoint(), equalTo(Collections.emptyMap()));
     }
 
     private Map<TopicPartition, Long> readOffsetsCheckpoint() throws 
IOException {

-- 
To stop receiving notification emails like this one, please contact
mj...@apache.org.

Reply via email to