Repository: kafka Updated Branches: refs/heads/0.10.2 bce189fa4 -> 9eb0cdb54
KAFKA-4317: Checkpoint state stores on commit interval This is a backport of https://github.com/apache/kafka/pull/2471 Author: Damian Guy <[email protected]> Reviewers: Guozhang Wang <[email protected]> Closes #3024 from dguy/k4881-bp Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9eb0cdb5 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9eb0cdb5 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9eb0cdb5 Branch: refs/heads/0.10.2 Commit: 9eb0cdb546a6ec7792cce12a0ceb6cd30afc88ad Parents: bce189f Author: Damian Guy <[email protected]> Authored: Thu May 11 23:18:23 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Thu May 11 23:18:23 2017 -0700 ---------------------------------------------------------------------- .../processor/internals/AbstractTask.java | 19 +- .../processor/internals/Checkpointable.java | 27 +++ .../internals/GlobalStateManagerImpl.java | 16 +- .../internals/GlobalStateUpdateTask.java | 3 +- .../internals/ProcessorStateManager.java | 58 +++--- .../processor/internals/StandbyTask.java | 23 +-- .../processor/internals/StateManager.java | 4 +- .../streams/processor/internals/StreamTask.java | 5 +- .../state/internals/InMemoryKeyValueStore.java | 187 ++++++++++++++++++ .../processor/internals/AbstractTaskTest.java | 1 + .../internals/GlobalStateManagerImplTest.java | 49 ++++- .../internals/GlobalStateTaskTest.java | 16 +- .../internals/ProcessorStateManagerTest.java | 198 +++++++++++++------ .../processor/internals/StandbyTaskTest.java | 49 ++++- .../processor/internals/StreamTaskTest.java | 63 ++++++ .../kafka/test/GlobalStateManagerStub.java | 7 +- .../kafka/test/ProcessorTopologyTestDriver.java | 3 +- 17 files changed, 588 insertions(+), 140 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb0cdb5/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 55418d5..8de5d23 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -52,14 +52,14 @@ public abstract class AbstractTask { /** * @throws ProcessorStateException if the state manager cannot be created */ - protected AbstractTask(TaskId id, - String applicationId, - Collection<TopicPartition> partitions, - ProcessorTopology topology, - Consumer<byte[], byte[]> consumer, - Consumer<byte[], byte[]> restoreConsumer, - boolean isStandby, - StateDirectory stateDirectory, + protected AbstractTask(final TaskId id, + final String applicationId, + final Collection<TopicPartition> partitions, + final ProcessorTopology topology, + final Consumer<byte[], byte[]> consumer, + final Consumer<byte[], byte[]> restoreConsumer, + final boolean isStandby, + final StateDirectory stateDirectory, final ThreadCache cache) { this.id = id; this.applicationId = applicationId; @@ -70,8 +70,7 @@ public abstract class AbstractTask { // create the processor state manager try { - this.stateMgr = new ProcessorStateManager(id, partitions, restoreConsumer, isStandby, stateDirectory, topology.storeToChangelogTopic()); - + stateMgr = new ProcessorStateManager(id, partitions, restoreConsumer, isStandby, stateDirectory, topology.storeToChangelogTopic()); } catch (IOException e) { throw new ProcessorStateException(String.format("task [%s] Error while creating the state manager", id), e); } http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb0cdb5/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointable.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointable.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointable.java new file mode 100644 index 0000000..7b02d5b --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointable.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.common.TopicPartition; + +import java.util.Map; + +// Interface to indicate that an object has associated partition offsets that can be checkpointed +interface Checkpointable { + void checkpoint(final Map<TopicPartition, Long> offsets); + Map<TopicPartition, Long> checkpointed(); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb0cdb5/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index 7534993..3819bb5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -58,11 +58,11 @@ public class GlobalStateManagerImpl implements GlobalStateManager { private final File baseDir; private final OffsetCheckpoint checkpoint; private final Set<String> globalStoreNames = new HashSet<>(); - private HashMap<TopicPartition, Long> checkpointableOffsets; + private final Map<TopicPartition, Long> checkpointableOffsets = new HashMap<>(); public GlobalStateManagerImpl(final ProcessorTopology topology, - final Consumer<byte[], byte[]> consumer, - final StateDirectory stateDirectory) { + final Consumer<byte[], byte[]> consumer, + final StateDirectory stateDirectory) { this.topology = topology; this.consumer = consumer; this.stateDirectory = stateDirectory; @@ -81,8 +81,7 @@ public class GlobalStateManagerImpl implements GlobalStateManager { } try { - this.checkpointableOffsets = new HashMap<>(checkpoint.read()); - checkpoint.delete(); + this.checkpointableOffsets.putAll(checkpoint.read()); } catch (IOException e) { try { stateDirectory.unlockGlobalState(); @@ -220,13 +219,14 @@ public class GlobalStateManagerImpl implements GlobalStateManager { if (closeFailed.length() > 0) { throw new ProcessorStateException("Exceptions caught during close of 1 or more global state stores\n" + closeFailed); } - writeCheckpoints(offsets); + checkpoint(offsets); } finally { stateDirectory.unlockGlobalState(); } } - private void writeCheckpoints(final Map<TopicPartition, Long> offsets) { + @Override + public void checkpoint(final Map<TopicPartition, Long> offsets) { if (!offsets.isEmpty()) { checkpointableOffsets.putAll(offsets); try { @@ -238,7 +238,7 @@ public class GlobalStateManagerImpl implements GlobalStateManager { } @Override - public Map<TopicPartition, Long> checkpointedOffsets() { + public Map<TopicPartition, Long> checkpointed() { return Collections.unmodifiableMap(checkpointableOffsets); } http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb0cdb5/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java index 40f2a3c..6da37e4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java @@ -67,7 +67,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer { } initTopology(); processorContext.initialized(); - return stateMgr.checkpointedOffsets(); + return stateMgr.checkpointed(); } @@ -89,6 +89,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer { public void flushState() { stateMgr.flush(processorContext); + stateMgr.checkpoint(offsets); } public void close() throws IOException { http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb0cdb5/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 2ef9634..ea074da 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -68,6 +68,7 @@ public class ProcessorStateManager implements StateManager { // TODO: this map does not work with customized grouper where multiple partitions // of the same topic can be assigned to the same topic. private final Map<String, TopicPartition> partitionForTopic; + private final OffsetCheckpoint checkpoint; /** * @throws LockException if the state directory cannot be locked because another thread holds the lock @@ -111,11 +112,8 @@ public class ProcessorStateManager implements StateManager { } // load the checkpoint information - OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME)); + checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME)); this.checkpointedOffsets = new HashMap<>(checkpoint.read()); - - // delete the checkpoint file after finish loading its stored offsets - checkpoint.delete(); } @@ -263,7 +261,7 @@ public class ProcessorStateManager implements StateManager { } } - public Map<TopicPartition, Long> checkpointedOffsets() { + public Map<TopicPartition, Long> checkpointed() { Map<TopicPartition, Long> partitionsAndOffsets = new HashMap<>(); for (Map.Entry<String, StateRestoreCallback> entry : restoreCallbacks.entrySet()) { @@ -360,30 +358,7 @@ public class ProcessorStateManager implements StateManager { } if (ackedOffsets != null) { - Map<TopicPartition, Long> checkpointOffsets = new HashMap<>(); - for (String storeName : stores.keySet()) { - // only checkpoint the offset to the offsets file if - // it is persistent AND changelog enabled - if (stores.get(storeName).persistent() && storeToChangelogTopic.containsKey(storeName)) { - String changelogTopic = storeToChangelogTopic.get(storeName); - TopicPartition topicPartition = new TopicPartition(changelogTopic, getPartition(storeName)); - - Long offset = ackedOffsets.get(topicPartition); - - if (offset != null) { - // store the last offset + 1 (the log position after restoration) - checkpointOffsets.put(topicPartition, offset + 1); - } else { - // if no record was produced. we need to check the restored offset. - offset = restoredOffsets.get(topicPartition); - if (offset != null) - checkpointOffsets.put(topicPartition, offset); - } - } - } - // write the checkpoint file before closing, to indicate clean shutdown - OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME)); - checkpoint.write(checkpointOffsets); + checkpoint(ackedOffsets); } } @@ -393,6 +368,31 @@ public class ProcessorStateManager implements StateManager { } } + // write the checkpoint + @Override + public void checkpoint(final Map<TopicPartition, Long> ackedOffsets) { + for (String storeName : stores.keySet()) { + // only checkpoint the offset to the offsets file if + // it is persistent AND changelog enabled + if (stores.get(storeName).persistent() && storeToChangelogTopic.containsKey(storeName)) { + final String changelogTopic = storeToChangelogTopic.get(storeName); + final TopicPartition topicPartition = new TopicPartition(changelogTopic, getPartition(storeName)); + if (ackedOffsets.containsKey(topicPartition)) { + // store the last offset + 1 (the log position after restoration) + checkpointedOffsets.put(topicPartition, ackedOffsets.get(topicPartition) + 1); + } else if (restoredOffsets.containsKey(topicPartition)) { + checkpointedOffsets.put(topicPartition, restoredOffsets.get(topicPartition)); + } + } + } + // write the checkpoint file before closing, to indicate clean shutdown + try { + checkpoint.write(checkpointedOffsets); + } catch (IOException e) { + log.warn("Failed to write checkpoint file to {}", new File(baseDir, CHECKPOINT_FILE_NAME), e); + } + } + private int getPartition(String topic) { TopicPartition partition = partitionForTopic.get(topic); http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb0cdb5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 4437a19..a27098c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -51,14 +51,15 @@ public class StandbyTask extends AbstractTask { * @param metrics the {@link StreamsMetrics} created by the thread * @param stateDirectory the {@link StateDirectory} created by the thread */ - public StandbyTask(TaskId id, - String applicationId, - Collection<TopicPartition> partitions, - ProcessorTopology topology, - Consumer<byte[], byte[]> consumer, - Consumer<byte[], byte[]> restoreConsumer, - StreamsConfig config, - StreamsMetrics metrics, final StateDirectory stateDirectory) { + public StandbyTask(final TaskId id, + final String applicationId, + final Collection<TopicPartition> partitions, + final ProcessorTopology topology, + final Consumer<byte[], byte[]> consumer, + final Consumer<byte[], byte[]> restoreConsumer, + final StreamsConfig config, + final StreamsMetrics metrics, + final StateDirectory stateDirectory) { super(id, applicationId, partitions, topology, consumer, restoreConsumer, true, stateDirectory, null); // initialize the topology with its own context @@ -67,9 +68,9 @@ public class StandbyTask extends AbstractTask { log.info("standby-task [{}] Initializing state stores", id()); initializeStateStores(); - ((StandbyContextImpl) this.processorContext).initialized(); + this.processorContext.initialized(); - this.checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointedOffsets()); + this.checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointed()); } public Map<TopicPartition, Long> checkpointedOffsets() { @@ -92,7 +93,7 @@ public class StandbyTask extends AbstractTask { public void commit() { log.debug("standby-task [{}] Committing its state", id()); stateMgr.flush(processorContext); - + stateMgr.checkpoint(Collections.<TopicPartition, Long>emptyMap()); // reinitialize offset limits initializeOffsetLimits(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb0cdb5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java index 7343c85..3102b77 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java @@ -24,7 +24,7 @@ import java.io.File; import java.io.IOException; import java.util.Map; -interface StateManager { +interface StateManager extends Checkpointable { File baseDir(); void register(final StateStore store, final boolean loggingEnabled, final StateRestoreCallback stateRestoreCallback); @@ -36,6 +36,4 @@ interface StateManager { StateStore getGlobalStore(final String name); StateStore getStore(final String name); - - Map<TopicPartition, Long> checkpointedOffsets(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb0cdb5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 7375fb5..3270596 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -74,8 +74,9 @@ public class StreamTask extends AbstractTask implements Punctuator { log.trace("{} Start flushing its producer's sent records upon committing its state", logPrefix); // 2) flush produced records in the downstream and change logs of local states recordCollector.flush(); - - // 3) commit consumed offsets if it is dirty already + // 3) write checkpoints for any local state + stateMgr.checkpoint(recordCollectorOffsets()); + // 4) commit consumed offsets if it is dirty already commitOffsets(); } }; http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb0cdb5/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java new file mode 100644 index 0000000..dbcc219 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateRestoreCallback; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StateSerdes; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; + + +public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> { + private final String name; + private final Serde<K> keySerde; + private final Serde<V> valueSerde; + private final NavigableMap<K, V> map; + private volatile boolean open = false; + + private StateSerdes<K, V> serdes; + + public InMemoryKeyValueStore(final String name, final Serde<K> keySerde, final Serde<V> valueSerde) { + this.name = name; + this.keySerde = keySerde; + this.valueSerde = valueSerde; + + // TODO: when we have serde associated with class types, we can + // improve this situation by passing the comparator here. + this.map = new TreeMap<>(); + } + + public KeyValueStore<K, V> enableLogging() { + return new InMemoryKeyValueLoggedStore<>(name, this, keySerde, valueSerde); + } + + @Override + public String name() { + return this.name; + } + + @Override + @SuppressWarnings("unchecked") + public void init(ProcessorContext context, StateStore root) { + // construct the serde + this.serdes = new StateSerdes<>(name, + keySerde == null ? (Serde<K>) context.keySerde() : keySerde, + valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); + + if (root != null) { + // register the store + context.register(root, true, new StateRestoreCallback() { + @Override + public void restore(byte[] key, byte[] value) { + // check value for null, to avoid deserialization error. + if (value == null) { + put(serdes.keyFrom(key), null); + } else { + put(serdes.keyFrom(key), serdes.valueFrom(value)); + } + } + }); + } + + this.open = true; + } + + @Override + public boolean persistent() { + return false; + } + + @Override + public boolean isOpen() { + return this.open; + } + + @Override + public synchronized V get(K key) { + return this.map.get(key); + } + + @Override + public synchronized void put(K key, V value) { + this.map.put(key, value); + } + + @Override + public synchronized V putIfAbsent(K key, V value) { + V originalValue = get(key); + if (originalValue == null) { + put(key, value); + } + return originalValue; + } + + @Override + public synchronized void putAll(List<KeyValue<K, V>> entries) { + for (KeyValue<K, V> entry : entries) + put(entry.key, entry.value); + } + + @Override + public synchronized V delete(K key) { + return this.map.remove(key); + } + + @Override + public synchronized KeyValueIterator<K, V> range(K from, K to) { + return new DelegatingPeekingKeyValueIterator<>(name, new InMemoryKeyValueIterator<>(this.map.subMap(from, true, to, true).entrySet().iterator())); + } + + @Override + public synchronized KeyValueIterator<K, V> all() { + final TreeMap<K, V> copy = new TreeMap<>(this.map); + return new DelegatingPeekingKeyValueIterator<>(name, new InMemoryKeyValueIterator<>(copy.entrySet().iterator())); + } + + @Override + public long approximateNumEntries() { + return this.map.size(); + } + + @Override + public void flush() { + // do-nothing since it is in-memory + } + + @Override + public void close() { + this.map.clear(); + this.open = false; + } + + private static class InMemoryKeyValueIterator<K, V> implements KeyValueIterator<K, V> { + private final Iterator<Map.Entry<K, V>> iter; + + private InMemoryKeyValueIterator(Iterator<Map.Entry<K, V>> iter) { + this.iter = iter; + } + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public KeyValue<K, V> next() { + Map.Entry<K, V> entry = iter.next(); + return new KeyValue<>(entry.getKey(), entry.getValue()); + } + + @Override + public void remove() { + iter.remove(); + } + + @Override + public void close() { + // do nothing + } + + @Override + public K peekNextKey() { + throw new UnsupportedOperationException("peekNextKey() not supported in " + getClass().getName()); + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb0cdb5/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java index 16967bc..a2346fc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java @@ -71,6 +71,7 @@ public class AbstractTaskTest { consumer, consumer, false, + new StateDirectory("app", TestUtils.tempDirectory().getPath()), new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics()))) { @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb0cdb5/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java index db51cef..8c9cf19 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java @@ -49,6 +49,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -122,15 +124,15 @@ public class GlobalStateManagerImplTest { final Map<TopicPartition, Long> expected = writeCheckpoint(); stateManager.initialize(context); - final Map<TopicPartition, Long> offsets = stateManager.checkpointedOffsets(); + final Map<TopicPartition, Long> offsets = stateManager.checkpointed(); assertEquals(expected, offsets); } @Test - public void shouldDeleteCheckpointFileAfteLoaded() throws Exception { + public void shouldNotDeleteCheckpointFileAfterLoaded() throws Exception { writeCheckpoint(); stateManager.initialize(context); - assertFalse(checkpointFile.exists()); + assertTrue(checkpointFile.exists()); } @Test(expected = StreamsException.class) @@ -168,7 +170,7 @@ public class GlobalStateManagerImplTest { } @Test - public void shouldThrowIllegalArgumenExceptionIfAttemptingToRegisterStoreTwice() throws Exception { + public void shouldThrowIllegalArgumentExceptionIfAttemptingToRegisterStoreTwice() throws Exception { stateManager.initialize(context); initializeConsumer(2, 1, t1); stateManager.register(store1, false, new TheStateRestoreCallback()); @@ -271,9 +273,7 @@ public class GlobalStateManagerImplTest { stateManager.register(store1, false, stateRestoreCallback); final Map<TopicPartition, Long> expected = Collections.singletonMap(t1, 25L); stateManager.close(expected); - final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(stateManager.baseDir(), - ProcessorStateManager.CHECKPOINT_FILE_NAME)); - final Map<TopicPartition, Long> result = offsetCheckpoint.read(); + final Map<TopicPartition, Long> result = readOffsetsCheckpoint(); assertEquals(expected, result); } @@ -379,6 +379,41 @@ public class GlobalStateManagerImplTest { } } + @Test + public void shouldCheckpointOffsets() throws Exception { + final Map<TopicPartition, Long> offsets = Collections.singletonMap(t1, 25L); + stateManager.initialize(context); + + stateManager.checkpoint(offsets); + + final Map<TopicPartition, Long> result = readOffsetsCheckpoint(); + assertThat(result, equalTo(offsets)); + assertThat(stateManager.checkpointed(), equalTo(offsets)); + } + + @Test + public void shouldNotRemoveOffsetsOfUnUpdatedTablesDuringCheckpoint() throws Exception { + stateManager.initialize(context); + final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback(); + initializeConsumer(10, 1, t1); + stateManager.register(store1, false, stateRestoreCallback); + initializeConsumer(20, 1, t2); + stateManager.register(store2, false, stateRestoreCallback); + + final Map<TopicPartition, Long> initialCheckpoint = stateManager.checkpointed(); + stateManager.checkpoint(Collections.singletonMap(t1, 101L)); + + final Map<TopicPartition, Long> updatedCheckpoint = stateManager.checkpointed(); + assertThat(updatedCheckpoint.get(t2), equalTo(initialCheckpoint.get(t2))); + assertThat(updatedCheckpoint.get(t1), equalTo(101L)); + } + + private Map<TopicPartition, Long> readOffsetsCheckpoint() throws IOException { + final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(stateManager.baseDir(), + ProcessorStateManager.CHECKPOINT_FILE_NAME)); + return offsetCheckpoint.read(); + } + private void initializeConsumer(final long numRecords, final long startOffset, final TopicPartition topicPartition) { final HashMap<TopicPartition, Long> startOffsets = new HashMap<>(); startOffsets.put(topicPartition, 1L); http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb0cdb5/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java index df0b73c..66999bc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java @@ -38,6 +38,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -137,7 +139,19 @@ public class GlobalStateTaskTest { globalStateTask.initialize(); globalStateTask.update(new ConsumerRecord<>("t1", 1, 51, "foo".getBytes(), "foo".getBytes())); globalStateTask.close(); - assertEquals(expectedOffsets, stateMgr.checkpointedOffsets()); + assertEquals(expectedOffsets, stateMgr.checkpointed()); assertTrue(stateMgr.closed); } + + @Test + public void shouldCheckpointOffsetsWhenStateIsFlushed() throws Exception { + final Map<TopicPartition, Long> expectedOffsets = new HashMap<>(); + expectedOffsets.put(t1, 102L); + expectedOffsets.put(t2, 100L); + globalStateTask.initialize(); + globalStateTask.update(new ConsumerRecord<>("t1", 1, 101, "foo".getBytes(), "foo".getBytes())); + globalStateTask.flushState(); + assertThat(stateMgr.checkpointed(), equalTo(expectedOffsets)); + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb0cdb5/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index 602601a..a9998e6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.errors.StreamsException; @@ -49,6 +50,8 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -57,14 +60,10 @@ import static org.junit.Assert.assertFalse; public class ProcessorStateManagerTest { - private File baseDir; - private StateDirectory stateDirectory; - public static class MockRestoreConsumer extends MockConsumer<byte[], byte[]> { private final Serializer<Integer> serializer = new IntegerSerializer(); private TopicPartition assignedPartition = null; - private TopicPartition seekPartition = null; private long seekOffset = -1L; private boolean seekToBeginingCalled = false; private boolean seekToEndCalled = false; @@ -155,7 +154,6 @@ public class ProcessorStateManagerTest { if (seekOffset >= 0) throw new IllegalStateException("RestoreConsumer: offset already seeked"); - seekPartition = partition; seekOffset = offset; currentOffset = offset; super.seek(partition, offset); @@ -196,11 +194,32 @@ public class ProcessorStateManagerTest { private final String nonPersistentStoreName = "nonPersistentStore"; private final String persistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(applicationId, persistentStoreName); private final String nonPersistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(applicationId, nonPersistentStoreName); + private final MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true); + private final MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); + private final TopicPartition persistentStorePartition = new TopicPartition(persistentStoreTopicName, 1); + private final String storeName = "mockStateStore"; + private final String changelogTopic = ProcessorStateManager.storeChangelogTopic(applicationId, storeName); + private final TopicPartition changelogTopicPartition = new TopicPartition(changelogTopic, 0); + private final TaskId taskId = new TaskId(0, 1); + private final MockRestoreConsumer restoreConsumer = new MockRestoreConsumer(); + private final MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(storeName, true); + private File baseDir; + private File checkpointFile; + private StateDirectory stateDirectory; + private OffsetCheckpoint checkpoint; @Before public void setup() { baseDir = TestUtils.tempDirectory(); stateDirectory = new StateDirectory(applicationId, baseDir.getPath()); + checkpointFile = new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME); + checkpoint = new OffsetCheckpoint(checkpointFile); + restoreConsumer.updatePartitions(persistentStoreTopicName, Utils.mkList( + new PartitionInfo(persistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]) + )); + restoreConsumer.updatePartitions(nonPersistentStoreTopicName, Utils.mkList( + new PartitionInfo(nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]) + )); } @After @@ -283,8 +302,6 @@ public class ProcessorStateManagerTest { public void testRegisterNonPersistentStore() throws IOException { long lastCheckpointedOffset = 10L; - MockRestoreConsumer restoreConsumer = new MockRestoreConsumer(); - OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME)); checkpoint.write(Collections.singletonMap(new TopicPartition(persistentStoreTopicName, 2), lastCheckpointedOffset)); @@ -296,8 +313,6 @@ public class ProcessorStateManagerTest { TopicPartition partition = new TopicPartition(persistentStoreTopicName, 2); restoreConsumer.updateEndOffsets(Collections.singletonMap(partition, 13L)); - MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); // non persistent store - ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 2), noPartitions, restoreConsumer, false, stateDirectory, new HashMap<String, String>() { { put(persistentStoreName, persistentStoreTopicName); @@ -308,7 +323,7 @@ public class ProcessorStateManagerTest { restoreConsumer.reset(); ArrayList<Integer> expectedKeys = new ArrayList<>(); - long offset = -1L; + long offset; for (int i = 1; i <= 3; i++) { offset = (long) (i + 100); int key = i; @@ -329,12 +344,13 @@ public class ProcessorStateManagerTest { } finally { stateMgr.close(Collections.<TopicPartition, Long>emptyMap()); } - } @Test public void testChangeLogOffsets() throws IOException { final TaskId taskId = new TaskId(0, 0); + final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint( + new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME)); long lastCheckpointedOffset = 10L; String storeName1 = "store1"; String storeName2 = "store2"; @@ -349,10 +365,7 @@ public class ProcessorStateManagerTest { storeToChangelogTopic.put(storeName2, storeTopicName2); storeToChangelogTopic.put(storeName3, storeTopicName3); - OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME)); - checkpoint.write(Collections.singletonMap(new TopicPartition(storeTopicName1, 0), lastCheckpointedOffset)); - - MockRestoreConsumer restoreConsumer = new MockRestoreConsumer(); + offsetCheckpoint.write(Collections.singletonMap(new TopicPartition(storeTopicName1, 0), lastCheckpointedOffset)); restoreConsumer.updatePartitions(storeTopicName1, Utils.mkList( new PartitionInfo(storeTopicName1, 0, Node.noNode(), new Node[0], new Node[0]) @@ -389,7 +402,7 @@ public class ProcessorStateManagerTest { stateMgr.register(store2, true, store2.stateRestoreCallback); stateMgr.register(store3, true, store3.stateRestoreCallback); - Map<TopicPartition, Long> changeLogOffsets = stateMgr.checkpointedOffsets(); + Map<TopicPartition, Long> changeLogOffsets = stateMgr.checkpointed(); assertEquals(3, changeLogOffsets.size()); assertTrue(changeLogOffsets.containsKey(partition1)); @@ -407,20 +420,12 @@ public class ProcessorStateManagerTest { @Test public void testGetStore() throws IOException { - MockRestoreConsumer restoreConsumer = new MockRestoreConsumer(); - - restoreConsumer.updatePartitions(nonPersistentStoreTopicName, Utils.mkList( - new PartitionInfo(nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]) - )); - - MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); - - ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 1), noPartitions, restoreConsumer, false, stateDirectory, Collections.<String, String>emptyMap()); + final ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 1), noPartitions, restoreConsumer, false, stateDirectory, Collections.<String, String>emptyMap()); try { - stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback); + stateMgr.register(nonPersistentStore, true, nonPersistentStore.stateRestoreCallback); assertNull(stateMgr.getStore("noSuchStore")); - assertEquals(mockStateStore, stateMgr.getStore(nonPersistentStoreName)); + assertEquals(nonPersistentStore, stateMgr.getStore(nonPersistentStoreName)); } finally { stateMgr.close(Collections.<TopicPartition, Long>emptyMap()); @@ -429,30 +434,15 @@ public class ProcessorStateManagerTest { @Test public void testFlushAndClose() throws IOException { - final TaskId taskId = new TaskId(0, 1); - File checkpointFile = new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME); // write an empty checkpoint file - OffsetCheckpoint oldCheckpoint = new OffsetCheckpoint(checkpointFile); - oldCheckpoint.write(Collections.<TopicPartition, Long>emptyMap()); - - MockRestoreConsumer restoreConsumer = new MockRestoreConsumer(); - - restoreConsumer.updatePartitions(persistentStoreTopicName, Utils.mkList( - new PartitionInfo(persistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]) - )); - restoreConsumer.updatePartitions(nonPersistentStoreTopicName, Utils.mkList( - new PartitionInfo(nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]) - )); + checkpoint.write(Collections.<TopicPartition, Long>emptyMap()); // set up ack'ed offsets - HashMap<TopicPartition, Long> ackedOffsets = new HashMap<>(); + final HashMap<TopicPartition, Long> ackedOffsets = new HashMap<>(); ackedOffsets.put(new TopicPartition(persistentStoreTopicName, 1), 123L); ackedOffsets.put(new TopicPartition(nonPersistentStoreTopicName, 1), 456L); ackedOffsets.put(new TopicPartition(ProcessorStateManager.storeChangelogTopic(applicationId, "otherTopic"), 1), 789L); - MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true); - MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); - ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, noPartitions, restoreConsumer, false, stateDirectory, new HashMap<String, String>() { { put(persistentStoreName, persistentStoreTopicName); @@ -460,8 +450,8 @@ public class ProcessorStateManagerTest { } }); try { - // make sure the checkpoint file is deleted - assertFalse(checkpointFile.exists()); + // make sure the checkpoint file isn't deleted + assertTrue(checkpointFile.exists()); restoreConsumer.reset(); stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback); @@ -482,42 +472,122 @@ public class ProcessorStateManagerTest { assertTrue(checkpointFile.exists()); // the checkpoint file should contain an offset from the persistent store only. - OffsetCheckpoint newCheckpoint = new OffsetCheckpoint(checkpointFile); - Map<TopicPartition, Long> checkpointedOffsets = newCheckpoint.read(); + final Map<TopicPartition, Long> checkpointedOffsets = checkpoint.read(); assertEquals(1, checkpointedOffsets.size()); assertEquals(new Long(123L + 1L), checkpointedOffsets.get(new TopicPartition(persistentStoreTopicName, 1))); } @Test public void shouldRegisterStoreWithoutLoggingEnabledAndNotBackedByATopic() throws Exception { - MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 1), noPartitions, new MockRestoreConsumer(), false, stateDirectory, Collections.<String, String>emptyMap()); - stateMgr.register(mockStateStore, false, mockStateStore.stateRestoreCallback); + stateMgr.register(nonPersistentStore, false, nonPersistentStore.stateRestoreCallback); assertNotNull(stateMgr.getStore(nonPersistentStoreName)); } + @Test - public void shouldNotWriteCheckpointsIfAckeOffsetsIsNull() throws Exception { - final TaskId taskId = new TaskId(0, 1); - final File checkpointFile = new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME); - // write an empty checkpoint file - final OffsetCheckpoint oldCheckpoint = new OffsetCheckpoint(checkpointFile); - oldCheckpoint.write(Collections.<TopicPartition, Long>emptyMap()); + public void shouldNotChangeOffsetsIfAckedOffsetsIsNull() throws Exception { + final Map<TopicPartition, Long> offsets = Collections.singletonMap(persistentStorePartition, 99L); + checkpoint.write(offsets); - final MockRestoreConsumer restoreConsumer = new MockRestoreConsumer(); + final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, + noPartitions, + restoreConsumer, + false, + stateDirectory, + Collections.<String, String>emptyMap()); - restoreConsumer.updatePartitions(persistentStoreTopicName, Utils.mkList( - new PartitionInfo(persistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]) - )); + restoreConsumer.reset(); + stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback); + stateMgr.close(null); + final Map<TopicPartition, Long> read = checkpoint.read(); + assertThat(read, equalTo(offsets)); + } + @Test + public void shouldWriteCheckpointForPersistentLogEnabledStore() throws Exception { + final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, + noPartitions, + restoreConsumer, + false, + stateDirectory, + Collections.singletonMap(persistentStore.name(), + persistentStoreTopicName)); + restoreConsumer.reset(); + stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback); - final MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true); - final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, noPartitions, restoreConsumer, false, stateDirectory, Collections.<String, String>emptyMap()); + stateMgr.checkpoint(Collections.singletonMap(persistentStorePartition, 10L)); + final Map<TopicPartition, Long> read = checkpoint.read(); + assertThat(read, equalTo(Collections.singletonMap(persistentStorePartition, 11L))); + } + + @Test + public void shouldWriteCheckpointForStandbyReplica() throws Exception { + final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, + noPartitions, + restoreConsumer, + true, + stateDirectory, + Collections.singletonMap(persistentStore.name(), + persistentStoreTopicName)); restoreConsumer.reset(); stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback); - stateMgr.close(null); - assertFalse(checkpointFile.exists()); + final byte[] bytes = Serdes.Integer().serializer().serialize("", 10); + stateMgr.updateStandbyStates(persistentStorePartition, + Collections.singletonList( + new ConsumerRecord<>(persistentStorePartition.topic(), + persistentStorePartition.partition(), + 888L, + bytes, + bytes))); + + stateMgr.checkpoint(Collections.<TopicPartition, Long>emptyMap()); + + final Map<TopicPartition, Long> read = checkpoint.read(); + assertThat(read, equalTo(Collections.singletonMap(persistentStorePartition, 889L))); + + } + + @Test + public void shouldNotWriteCheckpointForNonPersistent() throws Exception { + final TopicPartition topicPartition = new TopicPartition(nonPersistentStoreTopicName, 1); + + restoreConsumer.updatePartitions(nonPersistentStoreTopicName, Utils.mkList( + new PartitionInfo(nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]) + )); + + final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, + noPartitions, + restoreConsumer, + true, + stateDirectory, + Collections.singletonMap(nonPersistentStoreName, + nonPersistentStoreTopicName)); + + restoreConsumer.reset(); + stateMgr.register(nonPersistentStore, true, nonPersistentStore.stateRestoreCallback); + stateMgr.checkpoint(Collections.singletonMap(topicPartition, 876L)); + + final Map<TopicPartition, Long> read = checkpoint.read(); + assertThat(read, equalTo(Collections.<TopicPartition, Long>emptyMap())); + } + + @Test + public void shouldNotWriteCheckpointForStoresWithoutChangelogTopic() throws Exception { + final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, + noPartitions, + restoreConsumer, + true, + stateDirectory, + Collections.<String, String>emptyMap()); + + stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback); + + stateMgr.checkpoint(Collections.singletonMap(persistentStorePartition, 987L)); + + final Map<TopicPartition, Long> read = checkpoint.read(); + assertThat(read, equalTo(Collections.<TopicPartition, Long>emptyMap())); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb0cdb5/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index 2d32e78..4c3356a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -27,6 +27,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; @@ -53,6 +54,8 @@ import java.util.Properties; import java.util.Set; import static java.util.Collections.singleton; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -315,7 +318,7 @@ public class StandbyTaskTest { final String changelogName = "test-application-my-store-changelog"; final List<TopicPartition> partitions = Utils.mkList(new TopicPartition(changelogName, 0)); consumer.assign(partitions); - Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>(); + final Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>(); committedOffsets.put(new TopicPartition(changelogName, 0), new OffsetAndMetadata(0L)); consumer.commitSync(committedOffsets); @@ -326,9 +329,51 @@ public class StandbyTaskTest { final ProcessorTopology topology = builder.setApplicationId(applicationId).build(0); StreamsConfig config = createConfig(baseDir); new StandbyTask(taskId, applicationId, partitions, topology, consumer, restoreStateConsumer, config, - new MockStreamsMetrics(new Metrics()), stateDirectory); + new MockStreamsMetrics(new Metrics()), stateDirectory); } + + @Test + public void shouldCheckpointStoreOffsetsOnCommit() throws Exception { + consumer.assign(Utils.mkList(ktable)); + final Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>(); + committedOffsets.put(new TopicPartition(ktable.topic(), ktable.partition()), new OffsetAndMetadata(100L)); + consumer.commitSync(committedOffsets); + + restoreStateConsumer.updatePartitions("ktable1", Utils.mkList( + new PartitionInfo("ktable1", 0, Node.noNode(), new Node[0], new Node[0]))); + + final TaskId taskId = new TaskId(0, 0); + final StreamsConfig config = createConfig(baseDir); + final StandbyTask task = new StandbyTask(taskId, + applicationId, + ktablePartitions, + ktableTopology, + consumer, + restoreStateConsumer, + config, + null, + stateDirectory + ); + + + restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions())); + + final byte[] serializedValue = Serdes.Integer().serializer().serialize("", 1); + task.update(ktable, Collections.singletonList(new ConsumerRecord<>(ktable.topic(), + ktable.partition(), + 50L, + serializedValue, + serializedValue))); + + task.commit(); + + final Map<TopicPartition, Long> checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(taskId), + ProcessorStateManager.CHECKPOINT_FILE_NAME)).read(); + assertThat(checkpoint, equalTo(Collections.singletonMap(ktable, 51L))); + + } + private List<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]>... recs) { return Arrays.asList(recs); } http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb0cdb5/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 0479b9d..7f27fc4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -22,6 +22,8 @@ import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.record.TimestampType; @@ -39,6 +41,8 @@ import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore; +import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import org.apache.kafka.streams.state.internals.ThreadCache; import org.apache.kafka.test.MockProcessorNode; import org.apache.kafka.test.MockSourceNode; @@ -59,6 +63,8 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsEqual.equalTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -437,6 +443,63 @@ public class StreamTaskTest { assertTrue(flushed.get()); } + + @SuppressWarnings("unchecked") + @Test + public void shouldCheckpointOffsetsOnCommit() throws Exception { + final String storeName = "test"; + final String changelogTopic = ProcessorStateManager.storeChangelogTopic("appId", storeName); + final InMemoryKeyValueStore inMemoryStore = new InMemoryKeyValueStore(storeName, null, null) { + @Override + public void init(final ProcessorContext context, final StateStore root) { + context.register(root, true, null); + } + + @Override + public boolean persistent() { + return true; + } + }; + final ProcessorTopology topology = new ProcessorTopology(Collections.<ProcessorNode>emptyList(), + Collections.<String, SourceNode>emptyMap(), + Collections.<String, SinkNode>emptyMap(), + Collections.<StateStore>singletonList(inMemoryStore), + Collections.singletonMap(storeName, changelogTopic), + Collections.<StateStore>emptyList()); + + final TopicPartition partition = new TopicPartition(changelogTopic, 0); + final NoOpRecordCollector recordCollector = new NoOpRecordCollector() { + @Override + public Map<TopicPartition, Long> offsets() { + + return Collections.singletonMap(partition, 543L); + } + }; + + restoreStateConsumer.updatePartitions(changelogTopic, + Collections.singletonList( + new PartitionInfo(changelogTopic, 0, null, new Node[0], new Node[0]))); + restoreStateConsumer.updateEndOffsets(Collections.singletonMap(partition, 0L)); + restoreStateConsumer.updateBeginningOffsets(Collections.singletonMap(partition, 0L)); + + final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics()); + final TaskId taskId = new TaskId(0, 0); + final MockTime time = new MockTime(); + final StreamsConfig config = createConfig(baseDir); + final StreamTask streamTask = new StreamTask(taskId, "appId", partitions, topology, consumer, + restoreStateConsumer, config, streamsMetrics, + stateDirectory, new ThreadCache("testCache", 0, streamsMetrics), + time, recordCollector); + + time.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG)); + + streamTask.commit(); + final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(taskId), + ProcessorStateManager.CHECKPOINT_FILE_NAME)); + + assertThat(checkpoint.read(), equalTo(Collections.singletonMap(partition, 544L))); + } + private Iterable<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]>... recs) { return Arrays.asList(recs); } http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb0cdb5/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java index 2f3ef26..612a0da 100644 --- a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java +++ b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java @@ -67,6 +67,11 @@ public class GlobalStateManagerStub implements GlobalStateManager { } @Override + public void checkpoint(final Map<TopicPartition, Long> offsets) { + this.offsets.putAll(offsets); + } + + @Override public StateStore getGlobalStore(final String name) { return null; } @@ -77,7 +82,7 @@ public class GlobalStateManagerStub implements GlobalStateManager { } @Override - public Map<TopicPartition, Long> checkpointedOffsets() { + public Map<TopicPartition, Long> checkpointed() { return offsets; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9eb0cdb5/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java index 89ca0df..7ace43a 100644 --- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java @@ -204,7 +204,8 @@ public class ProcessorTopologyTestDriver { final GlobalStateManagerImpl stateManager = new GlobalStateManagerImpl(globalTopology, globalConsumer, stateDirectory); globalStateTask = new GlobalStateUpdateTask(globalTopology, new GlobalProcessorContextImpl(config, stateManager, streamsMetrics, cache), - stateManager); + stateManager + ); globalStateTask.initialize(); }
