This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch kafka-6711-GlobalStateManagerImpl-no-checkpoint-in-memory in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 1a349142cd6daf1dfeece02cc0553b1b7d82c2c8 Author: Matthias J. Sax <matth...@confluent.io> AuthorDate: Wed Jun 13 16:30:19 2018 -0700 Github comments Moved immutable part into constructor Don't write non-persistent partitions into checkpoint file --- .../internals/GlobalStateManagerImpl.java | 23 +++++++++++----------- .../internals/GlobalStateManagerImplTest.java | 4 ++-- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index fac1526..05c82d4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -63,6 +63,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob private final int retries; private final long retryBackoffMs; private final Duration pollTime; + private final Set<String> globalNonPersistentStoresTopics = new HashSet<>(); public GlobalStateManagerImpl(final LogContext logContext, final ProcessorTopology topology, @@ -72,6 +73,14 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob final StreamsConfig config) { super(stateDirectory.globalStateDir(), StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG))); + // Find non persistent store's topics + final Map<String, String> storeToChangelogTopic = topology.storeToChangelogTopic(); + for (final StateStore store : topology.globalStateStores()) { + if (!store.persistent()) { + globalNonPersistentStoresTopics.add(storeToChangelogTopic.get(store.name())); + } + } + this.log = logContext.logger(GlobalStateManagerImpl.class); this.topology = topology; this.globalConsumer = globalConsumer; @@ -246,7 +255,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob for (final TopicPartition topicPartition : topicPartitions) { globalConsumer.assign(Collections.singletonList(topicPartition)); final Long checkpoint = checkpointableOffsets.get(topicPartition); - if (checkpoint != null && checkpoint > StateRestorer.NO_CHECKPOINT) { + if (checkpoint != null) { globalConsumer.seek(topicPartition, checkpoint); } else { globalConsumer.seekToBeginning(Collections.singletonList(topicPartition)); @@ -337,16 +346,6 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob @Override public void checkpoint(final Map<TopicPartition, Long> offsets) { - - // Find non persistent store's topics - final Map<String, String> storeToChangelogTopic = topology.storeToChangelogTopic(); - final Set<String> globalNonPersistentStoresTopics = new HashSet<>(); - for (final StateStore store : topology.globalStateStores()) { - if (!store.persistent() && storeToChangelogTopic.containsKey(store.name())) { - globalNonPersistentStoresTopics.add(storeToChangelogTopic.get(store.name())); - } - } - checkpointableOffsets.putAll(offsets); final Map<TopicPartition, Long> filteredOffsets = new HashMap<>(); @@ -355,7 +354,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob for (final Map.Entry<TopicPartition, Long> topicPartitionOffset : checkpointableOffsets.entrySet()) { final String topic = topicPartitionOffset.getKey().topic(); if (globalNonPersistentStoresTopics.contains(topic)) { - filteredOffsets.put(topicPartitionOffset.getKey(), (long) StateRestorer.NO_CHECKPOINT); + filteredOffsets.remove(topicPartitionOffset.getKey()); } else { filteredOffsets.put(topicPartitionOffset.getKey(), topicPartitionOffset.getValue()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java index 19af5f4..e37f6a6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java @@ -493,9 +493,9 @@ public class GlobalStateManagerImplTest { stateManager.initialize(); initializeConsumer(10, 1, t3); stateManager.register(store3, stateRestoreCallback); - stateManager.close(Collections.<TopicPartition, Long>emptyMap()); + stateManager.close(Collections.emptyMap()); - assertThat(readOffsetsCheckpoint(), equalTo(Collections.singletonMap(t3, (long) StateRestorer.NO_CHECKPOINT))); + assertThat(readOffsetsCheckpoint(), equalTo(Collections.emptyMap())); } private Map<TopicPartition, Long> readOffsetsCheckpoint() throws IOException { -- To stop receiving notification emails like this one, please contact mj...@apache.org.