This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch kafka-6711-GlobalStateManagerImpl-no-checkpoint-in-memory in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 90236b063e1ca51363d1223dac8c25b8744af39c Author: Cemo <[email protected]> AuthorDate: Wed Mar 28 01:31:51 2018 +0300 KAFKA-6711: Address mjsax's reviews --- .../processor/internals/GlobalStateManagerImpl.java | 10 +++++----- .../processor/internals/GlobalStateManagerImplTest.java | 16 +--------------- 2 files changed, 6 insertions(+), 20 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index f9f5878..26cfcf7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -339,9 +339,9 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob public void checkpoint(final Map<TopicPartition, Long> offsets) { // Find non persistent store's topics - Map<String, String> storeToChangelogTopic = topology.storeToChangelogTopic(); - Set<String> globalNonPersistentStoresTopics = new HashSet<>(); - for (StateStore store : topology.globalStateStores()) { + final Map<String, String> storeToChangelogTopic = topology.storeToChangelogTopic(); + final Set<String> globalNonPersistentStoresTopics = new HashSet<>(); + for (final StateStore store : topology.globalStateStores()) { if (!store.persistent() && storeToChangelogTopic.containsKey(store.name())) { globalNonPersistentStoresTopics.add(storeToChangelogTopic.get(store.name())); } @@ -352,8 +352,8 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob final Map<TopicPartition, Long> filteredOffsets = new HashMap<>(); // Skip non persistent store - for (Map.Entry<TopicPartition, Long> topicPartitionOffset : checkpointableOffsets.entrySet()) { - String topic = topicPartitionOffset.getKey().topic(); + for (final Map.Entry<TopicPartition, Long> topicPartitionOffset : checkpointableOffsets.entrySet()) { + final String topic = topicPartitionOffset.getKey().topic(); if (globalNonPersistentStoresTopics.contains(topic)) { log.debug("Skipping global store' topic {}", topic); } else { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java index 3a1cf0d..7769c0a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java @@ -495,21 +495,7 @@ public class GlobalStateManagerImplTest { stateManager.register(store3, stateRestoreCallback); stateManager.close(Collections.<TopicPartition, Long>emptyMap()); - final OffsetCheckpoint checkpoint = new OffsetCheckpoint(checkpointFile); - - assertThat(checkpoint.read(), equalTo(Collections.<TopicPartition, Long>emptyMap())); - } - - @Test - public void shouldNotSkipGlobalInMemoryStoreOffsetsToFile() throws IOException { - stateManager.initialize(); - initializeConsumer(10, 1, t1); - stateManager.register(store1, stateRestoreCallback); - stateManager.close(Collections.<TopicPartition, Long>emptyMap()); - - final OffsetCheckpoint checkpoint = new OffsetCheckpoint(checkpointFile); - - assertThat(checkpoint.read(), equalTo(Collections.singletonMap(t1, 11L))); + assertThat(readOffsetsCheckpoint(), equalTo(Collections.<TopicPartition, Long>emptyMap())); } private Map<TopicPartition, Long> readOffsetsCheckpoint() throws IOException { -- To stop receiving notification emails like this one, please contact [email protected].
