Repository: kafka Updated Branches: refs/heads/trunk 67a7ea9d6 -> 330274ed1
KAFKA-3229 ensure that root statestore is registered with ProcessorStateManager Pass through the root StateStore in the init method so the inner StateStore can register that object. Author: tomdearman <[email protected]> Reviewers: Yasuhiro Matsuda Closes #904 from tomdearman/KAFKA-3229 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/330274ed Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/330274ed Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/330274ed Branch: refs/heads/trunk Commit: 330274ed1c8efd2b1aa9907860429d9d20f72c3c Parents: 67a7ea9 Author: tomdearman <[email protected]> Authored: Thu Feb 11 11:35:55 2016 -0700 Committer: Gwen Shapira <[email protected]> Committed: Thu Feb 11 11:35:55 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/kafka/streams/processor/StateStore.java | 2 +- .../apache/kafka/streams/processor/internals/AbstractTask.java | 2 +- .../streams/state/internals/InMemoryKeyValueLoggedStore.java | 5 +++-- .../streams/state/internals/InMemoryKeyValueStoreSupplier.java | 4 ++-- .../apache/kafka/streams/state/internals/MemoryLRUCache.java | 5 +++-- .../kafka/streams/state/internals/MeteredKeyValueStore.java | 5 +++-- .../kafka/streams/state/internals/MeteredWindowStore.java | 5 +++-- .../org/apache/kafka/streams/state/internals/RocksDBStore.java | 5 +++-- .../kafka/streams/state/internals/RocksDBWindowStore.java | 5 +++-- .../streams/state/internals/InMemoryKeyValueStoreTest.java | 2 +- .../streams/state/internals/InMemoryLRUCacheStoreTest.java | 2 +- .../kafka/streams/state/internals/RocksDBKeyValueStoreTest.java | 2 +- .../kafka/streams/state/internals/RocksDBWindowStoreTest.java | 2 +- .../src/test/java/org/apache/kafka/test/KStreamTestDriver.java | 2 +- .../test/java/org/apache/kafka/test/MockStateStoreSupplier.java | 4 ++-- 15 files changed, 29 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/330274ed/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java index 9c085a5..b07e510 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java @@ -38,7 +38,7 @@ public interface StateStore { /** * Initializes this state store */ - void init(ProcessorContext context); + void init(ProcessorContext context, StateStore root); /** * Flush any cached data http://git-wip-us.apache.org/repos/asf/kafka/blob/330274ed/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 162a926..3f7140a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -71,7 +71,7 @@ public abstract class AbstractTask { protected void initializeStateStores() { for (StateStoreSupplier stateStoreSupplier : this.topology.stateStoreSuppliers()) { StateStore store = stateStoreSupplier.get(); - store.init(this.processorContext); + store.init(this.processorContext, store); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/330274ed/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java index 94349bf..596cc2b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.Serdes; @@ -46,10 +47,10 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> { } @Override - public void init(ProcessorContext context) { + public void init(ProcessorContext context, StateStore root) { this.changeLogger = new StoreChangeLogger<>(storeName, context, serdes); - inner.init(context); + inner.init(context, root); this.getter = new StoreChangeLogger.ValueGetter<K, V>() { @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/330274ed/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java index 03290c1..0665af2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java @@ -88,9 +88,9 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier { } @Override - public void init(ProcessorContext context) { + public void init(ProcessorContext context, StateStore root) { if (loggingEnabled) { - context.register(this, true, new StateRestoreCallback() { + context.register(root, true, new StateRestoreCallback() { @Override public void restore(byte[] key, byte[] value) { http://git-wip-us.apache.org/repos/asf/kafka/blob/330274ed/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java index aaa1efd..2a8be8c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; +import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.Serdes; @@ -87,9 +88,9 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> { } @Override - public void init(ProcessorContext context) { + public void init(ProcessorContext context, StateStore root) { if (loggingEnabled) { - context.register(this, true, new StateRestoreCallback() { + context.register(root, true, new StateRestoreCallback() { @Override public void restore(byte[] key, byte[] value) { http://git-wip-us.apache.org/repos/asf/kafka/blob/330274ed/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index fd308c3..46feb58 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; @@ -64,7 +65,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> { } @Override - public void init(ProcessorContext context) { + public void init(ProcessorContext context, StateStore root) { final String name = name(); this.metrics = context.metrics(); this.putTime = this.metrics.addLatencySensor(metricScope, name, "put"); @@ -79,7 +80,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> { // register and possibly restore the state from the logs long startNs = time.nanoseconds(); try { - inner.init(context); + inner.init(context, root); } finally { this.metrics.recordLatency(this.restoreTime, startNs, time.nanoseconds()); } http://git-wip-us.apache.org/repos/asf/kafka/blob/330274ed/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java index 33f4c88..37ae499 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; @@ -51,7 +52,7 @@ public class MeteredWindowStore<K, V> implements WindowStore<K, V> { } @Override - public void init(ProcessorContext context) { + public void init(ProcessorContext context, StateStore root) { final String name = name(); this.metrics = context.metrics(); this.putTime = this.metrics.addLatencySensor(metricScope, name, "put"); @@ -62,7 +63,7 @@ public class MeteredWindowStore<K, V> implements WindowStore<K, V> { // register and possibly restore the state from the logs long startNs = time.nanoseconds(); try { - inner.init(context); + inner.init(context, root); } finally { this.metrics.recordLatency(this.restoreTime, startNs, time.nanoseconds()); } http://git-wip-us.apache.org/repos/asf/kafka/blob/330274ed/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 11bf96e..999c9ec 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -21,6 +21,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; +import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.Serdes; @@ -142,7 +143,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { } @SuppressWarnings("unchecked") - public void init(ProcessorContext context) { + public void init(ProcessorContext context, StateStore root) { // first open the DB dir openDB(context); @@ -176,7 +177,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { } }; - context.register(this, loggingEnabled, new StateRestoreCallback() { + context.register(root, loggingEnabled, new StateRestoreCallback() { @Override public void restore(byte[] key, byte[] value) { http://git-wip-us.apache.org/repos/asf/kafka/blob/330274ed/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java index 2758e6e..b1605a3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; +import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.Serdes; import org.apache.kafka.streams.state.WindowStore; @@ -158,7 +159,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { } @Override - public void init(ProcessorContext context) { + public void init(ProcessorContext context, StateStore root) { this.context = context; openExistingSegments(); @@ -167,7 +168,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { new RawStoreChangeLogger(name, context) : null; // register and possibly restore the state from the logs - context.register(this, loggingEnabled, new StateRestoreCallback() { + context.register(root, loggingEnabled, new StateRestoreCallback() { @Override public void restore(byte[] key, byte[] value) { putInternal(key, value); http://git-wip-us.apache.org/repos/asf/kafka/blob/330274ed/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java index 2b0927e..46948bd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java @@ -44,7 +44,7 @@ public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest { } KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get(); - store.init(context); + store.init(context, store); return store; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/330274ed/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java index 10f31d6..a2b79e5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java @@ -50,7 +50,7 @@ public class InMemoryLRUCacheStoreTest extends AbstractKeyValueStoreTest { } KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get(); - store.init(context); + store.init(context, store); return store; } http://git-wip-us.apache.org/repos/asf/kafka/blob/330274ed/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java index b9703db..8e8f69c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java @@ -45,7 +45,7 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest { } KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get(); - store.init(context); + store.init(context, store); return store; } http://git-wip-us.apache.org/repos/asf/kafka/blob/330274ed/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java index fd55944..5a196ec 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java @@ -65,7 +65,7 @@ public class RocksDBWindowStoreTest { StateStoreSupplier supplier = new RocksDBWindowStoreSupplier<>(windowName, retentionPeriod, numSegments, true, serdes, null); WindowStore<K, V> store = (WindowStore<K, V>) supplier.get(); - store.init(context); + store.init(context, store); return store; } http://git-wip-us.apache.org/repos/asf/kafka/blob/330274ed/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java index 2dc567e..c0c5c39 100644 --- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java @@ -59,7 +59,7 @@ public class KStreamTestDriver { for (StateStoreSupplier stateStoreSupplier : topology.stateStoreSuppliers()) { StateStore store = stateStoreSupplier.get(); - store.init(context); + store.init(context, store); } for (ProcessorNode node : topology.processors()) { http://git-wip-us.apache.org/repos/asf/kafka/blob/330274ed/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java index 73d446f..7b31477 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java +++ b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java @@ -81,8 +81,8 @@ public class MockStateStoreSupplier implements StateStoreSupplier { } @Override - public void init(ProcessorContext context) { - context.register(this, loggingEnabled, stateRestoreCallback); + public void init(ProcessorContext context, StateStore root) { + context.register(root, loggingEnabled, stateRestoreCallback); initialized = true; }
