This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 1.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push: new f15c986 KAFKA-6711: GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file (#5219) f15c986 is described below commit f15c986770f93a20e7552764c058b7ba5d52233e Author: Matthias J. Sax <mj...@apache.org> AuthorDate: Thu Jun 14 04:44:09 2018 -0700 KAFKA-6711: GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file (#5219) --- .../internals/GlobalStateManagerImpl.java | 29 ++++++++++++++++++---- .../integration/GlobalKTableIntegrationTest.java | 26 ++++++++++++++++++- .../internals/GlobalStateManagerImplTest.java | 10 ++++++++ .../org/apache/kafka/test/NoOpReadOnlyStore.java | 2 +- 4 files changed, 60 insertions(+), 7 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index 54785aa..fab6769 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -41,6 +41,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -60,6 +61,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob private InternalProcessorContext processorContext; private final int retries; private final long retryBackoffMs; + private final Set<String> globalNonPersistentStoresTopics = new HashSet<>(); public GlobalStateManagerImpl(final LogContext logContext, final ProcessorTopology topology, @@ -69,6 +71,14 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob final StreamsConfig config) { super(stateDirectory.globalStateDir(), StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG))); + // Find non persistent store's topics + final Map<String, String> storeToChangelogTopic = topology.storeToChangelogTopic(); + for (final StateStore store : topology.globalStateStores()) { + if (!store.persistent()) { + globalNonPersistentStoresTopics.add(storeToChangelogTopic.get(store.name())); + } + } + this.log = logContext.logger(GlobalStateManagerImpl.class); this.topology = topology; this.globalConsumer = globalConsumer; @@ -334,13 +344,22 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob @Override public void checkpoint(final Map<TopicPartition, Long> offsets) { checkpointableOffsets.putAll(offsets); - if (!checkpointableOffsets.isEmpty()) { - try { - checkpoint.write(checkpointableOffsets); - } catch (final IOException e) { - log.warn("Failed to write offset checkpoint file to {} for global stores: {}", checkpoint, e); + + final Map<TopicPartition, Long> filteredOffsets = new HashMap<>(); + + // Skip non persistent store + for (final Map.Entry<TopicPartition, Long> topicPartitionOffset : checkpointableOffsets.entrySet()) { + final String topic = topicPartitionOffset.getKey().topic(); + if (!globalNonPersistentStoresTopics.contains(topic)) { + filteredOffsets.put(topicPartitionOffset.getKey(), topicPartitionOffset.getValue()); } } + + try { + checkpoint.write(filteredOffsets); + } catch (final IOException e) { + log.warn("Failed to write offset checkpoint file to {} for global stores: {}", checkpoint, e); + } } @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java index 5723f14..576ba82 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java @@ -38,6 +38,7 @@ import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.apache.kafka.streams.state.Stores; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; @@ -53,6 +54,9 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; +import static org.hamcrest.core.IsEqual.equalTo; +import static org.junit.Assert.assertThat; + @Category({IntegrationTest.class}) public class GlobalKTableIntegrationTest { private static final int NUM_BROKERS = 1; @@ -220,7 +224,27 @@ public class GlobalKTableIntegrationTest { } }, 30000L, "waiting for final values"); } - + + @Test + public void shouldRestoreGlobalInMemoryKTableOnRestart() throws Exception { + builder = new StreamsBuilder(); + globalTable = builder.globalTable( + globalTableTopic, + Consumed.with(Serdes.Long(), Serdes.String()), + Materialized.<Long, String>as(Stores.inMemoryKeyValueStore(globalStore))); + + produceInitialGlobalTableValues(); + + startStreams(); + ReadOnlyKeyValueStore<Long, String> store = kafkaStreams.store(globalStore, QueryableStoreTypes.<Long, String>keyValueStore()); + assertThat(store.approximateNumEntries(), equalTo(4L)); + kafkaStreams.close(); + + startStreams(); + store = kafkaStreams.store(globalStore, QueryableStoreTypes.<Long, String>keyValueStore()); + assertThat(store.approximateNumEntries(), equalTo(4L)); + } + private void createTopics() throws InterruptedException { streamTopic = "stream-" + testNo; globalTableTopic = "globalTable-" + testNo; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java index df8d201..c449ec5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java @@ -488,6 +488,16 @@ public class GlobalStateManagerImplTest { assertThat(readOffsetsCheckpoint(), equalTo(checkpointMap)); } + @Test + public void shouldSkipGlobalInMemoryStoreOffsetsToFile() throws IOException { + stateManager.initialize(); + initializeConsumer(10, 1, t3); + stateManager.register(store3, stateRestoreCallback); + stateManager.close(Collections.<TopicPartition, Long>emptyMap()); + + assertThat(readOffsetsCheckpoint(), equalTo(Collections.<TopicPartition, Long>emptyMap())); + } + private Map<TopicPartition, Long> readOffsetsCheckpoint() throws IOException { final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(stateManager.baseDir(), ProcessorStateManager.CHECKPOINT_FILE_NAME)); diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java index ae46b8d..08945d5 100644 --- a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java +++ b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java @@ -95,7 +95,7 @@ public class NoOpReadOnlyStore<K, V> @Override public boolean persistent() { - return false; + return rocksdbStore; } @Override -- To stop receiving notification emails like this one, please contact mj...@apache.org.