This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 
kafka-6711-GlobalStateManagerImpl-no-checkpoint-in-memory
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 90236b063e1ca51363d1223dac8c25b8744af39c
Author: Cemo <[email protected]>
AuthorDate: Wed Mar 28 01:31:51 2018 +0300

    KAFKA-6711: Address mjsax's reviews
---
 .../processor/internals/GlobalStateManagerImpl.java      | 10 +++++-----
 .../processor/internals/GlobalStateManagerImplTest.java  | 16 +---------------
 2 files changed, 6 insertions(+), 20 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index f9f5878..26cfcf7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -339,9 +339,9 @@ public class GlobalStateManagerImpl extends 
AbstractStateManager implements Glob
     public void checkpoint(final Map<TopicPartition, Long> offsets) {
 
         // Find non persistent store's topics
-        Map<String, String> storeToChangelogTopic = 
topology.storeToChangelogTopic();
-        Set<String> globalNonPersistentStoresTopics = new HashSet<>();
-        for (StateStore store : topology.globalStateStores()) {
+        final Map<String, String> storeToChangelogTopic = 
topology.storeToChangelogTopic();
+        final Set<String> globalNonPersistentStoresTopics = new HashSet<>();
+        for (final StateStore store : topology.globalStateStores()) {
             if (!store.persistent() && 
storeToChangelogTopic.containsKey(store.name())) {
                 
globalNonPersistentStoresTopics.add(storeToChangelogTopic.get(store.name()));
             }
@@ -352,8 +352,8 @@ public class GlobalStateManagerImpl extends 
AbstractStateManager implements Glob
         final Map<TopicPartition, Long> filteredOffsets = new HashMap<>();
 
         // Skip non persistent store
-        for (Map.Entry<TopicPartition, Long> topicPartitionOffset : 
checkpointableOffsets.entrySet()) {
-            String topic = topicPartitionOffset.getKey().topic();
+        for (final Map.Entry<TopicPartition, Long> topicPartitionOffset : 
checkpointableOffsets.entrySet()) {
+            final String topic = topicPartitionOffset.getKey().topic();
             if (globalNonPersistentStoresTopics.contains(topic)) {
                 log.debug("Skipping global store' topic {}", topic);
             } else {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index 3a1cf0d..7769c0a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -495,21 +495,7 @@ public class GlobalStateManagerImplTest {
         stateManager.register(store3, stateRestoreCallback);
         stateManager.close(Collections.<TopicPartition, Long>emptyMap());
 
-        final OffsetCheckpoint checkpoint = new 
OffsetCheckpoint(checkpointFile);
-
-        assertThat(checkpoint.read(), equalTo(Collections.<TopicPartition, 
Long>emptyMap()));
-    }
-
-    @Test
-    public void shouldNotSkipGlobalInMemoryStoreOffsetsToFile() throws 
IOException {
-        stateManager.initialize();
-        initializeConsumer(10, 1, t1);
-        stateManager.register(store1, stateRestoreCallback);
-        stateManager.close(Collections.<TopicPartition, Long>emptyMap());
-
-        final OffsetCheckpoint checkpoint = new 
OffsetCheckpoint(checkpointFile);
-
-        assertThat(checkpoint.read(), equalTo(Collections.singletonMap(t1, 
11L)));
+        assertThat(readOffsetsCheckpoint(), 
equalTo(Collections.<TopicPartition, Long>emptyMap()));
     }
 
     private Map<TopicPartition, Long> readOffsetsCheckpoint() throws 
IOException {

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to