This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch kafka-6711-GlobalStateManagerImpl-no-checkpoint-in-memory in repository https://gitbox.apache.org/repos/asf/kafka.git
commit e1956f5e979938909abc65c82b10199b5abdcec7 Author: Cemo <[email protected]> AuthorDate: Tue Mar 27 23:17:33 2018 +0300 KAFKA-6711: GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file --- .../internals/GlobalStateManagerImpl.java | 28 ++++++++++++++++++++-- .../internals/GlobalStateManagerImplTest.java | 24 +++++++++++++++++++ .../org/apache/kafka/test/NoOpReadOnlyStore.java | 2 +- 3 files changed, 51 insertions(+), 3 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index 78c4a36..f9f5878 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -42,6 +42,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -336,10 +337,33 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob @Override public void checkpoint(final Map<TopicPartition, Long> offsets) { + + // Find non persistent store's topics + Map<String, String> storeToChangelogTopic = topology.storeToChangelogTopic(); + Set<String> globalNonPersistentStoresTopics = new HashSet<>(); + for (StateStore store : topology.globalStateStores()) { + if (!store.persistent() && storeToChangelogTopic.containsKey(store.name())) { + globalNonPersistentStoresTopics.add(storeToChangelogTopic.get(store.name())); + } + } + checkpointableOffsets.putAll(offsets); - if (!checkpointableOffsets.isEmpty()) { + + final Map<TopicPartition, Long> filteredOffsets = new HashMap<>(); + + // Skip non persistent store + for (Map.Entry<TopicPartition, Long> topicPartitionOffset : checkpointableOffsets.entrySet()) { + String topic = topicPartitionOffset.getKey().topic(); + if (globalNonPersistentStoresTopics.contains(topic)) { + log.debug("Skipping global store' topic {}", topic); + } else { + filteredOffsets.put(topicPartitionOffset.getKey(), topicPartitionOffset.getValue()); + } + } + + if (!filteredOffsets.isEmpty()) { try { - checkpoint.write(checkpointableOffsets); + checkpoint.write(filteredOffsets); } catch (final IOException e) { log.warn("Failed to write offset checkpoint file to {} for global stores: {}", checkpoint, e); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java index 2ca9c21..3a1cf0d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java @@ -488,6 +488,30 @@ public class GlobalStateManagerImplTest { assertThat(readOffsetsCheckpoint(), equalTo(checkpointMap)); } + @Test + public void shouldSkipGlobalInMemoryStoreOffsetsToFile() throws IOException { + stateManager.initialize(); + initializeConsumer(10, 1, t3); + stateManager.register(store3, stateRestoreCallback); + stateManager.close(Collections.<TopicPartition, Long>emptyMap()); + + final OffsetCheckpoint checkpoint = new OffsetCheckpoint(checkpointFile); + + assertThat(checkpoint.read(), equalTo(Collections.<TopicPartition, Long>emptyMap())); + } + + @Test + public void shouldNotSkipGlobalInMemoryStoreOffsetsToFile() throws IOException { + stateManager.initialize(); + initializeConsumer(10, 1, t1); + stateManager.register(store1, stateRestoreCallback); + stateManager.close(Collections.<TopicPartition, Long>emptyMap()); + + final OffsetCheckpoint checkpoint = new OffsetCheckpoint(checkpointFile); + + assertThat(checkpoint.read(), equalTo(Collections.singletonMap(t1, 11L))); + } + private Map<TopicPartition, Long> readOffsetsCheckpoint() throws IOException { final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(stateManager.baseDir(), ProcessorStateManager.CHECKPOINT_FILE_NAME)); diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java index ae46b8d..08945d5 100644 --- a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java +++ b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java @@ -95,7 +95,7 @@ public class NoOpReadOnlyStore<K, V> @Override public boolean persistent() { - return false; + return rocksdbStore; } @Override -- To stop receiving notification emails like this one, please contact [email protected].
