Repository: kafka Updated Branches: refs/heads/trunk 2bf2348b5 -> 043951753
http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java index 20cf125..df8d201 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.PartitionInfo; @@ -33,8 +34,8 @@ import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; +import org.apache.kafka.test.MockProcessorContext; import org.apache.kafka.test.MockStateRestoreListener; -import org.apache.kafka.test.NoOpProcessorContext; import org.apache.kafka.test.NoOpReadOnlyStore; import org.apache.kafka.test.TestUtils; import org.junit.After; @@ -70,45 +71,57 @@ public class GlobalStateManagerImplTest { private final MockTime time = new MockTime(); private final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback(); private final MockStateRestoreListener stateRestoreListener = new MockStateRestoreListener(); + private final String storeName1 = "t1-store"; + private final String storeName2 = "t2-store"; + private final String storeName3 = "t3-store"; + private final String storeName4 = "t4-store"; private final TopicPartition t1 = new TopicPartition("t1", 1); private final TopicPartition t2 = new TopicPartition("t2", 1); + private final TopicPartition t3 = new TopicPartition("t3", 1); + private final TopicPartition t4 = new TopicPartition("t4", 1); private GlobalStateManagerImpl stateManager; - private NoOpProcessorContext context; private StateDirectory stateDirectory; - private StreamsConfig config; - private NoOpReadOnlyStore<Object, Object> store1; - private NoOpReadOnlyStore store2; + private StreamsConfig streamsConfig; + private NoOpReadOnlyStore<Object, Object> store1, store2, store3, store4; private MockConsumer<byte[], byte[]> consumer; private File checkpointFile; private ProcessorTopology topology; + private MockProcessorContext mockProcessorContext; @Before public void before() throws IOException { final Map<String, String> storeToTopic = new HashMap<>(); - store1 = new NoOpReadOnlyStore<>("t1-store"); - store2 = new NoOpReadOnlyStore("t2-store"); - storeToTopic.put("t1-store", "t1"); - storeToTopic.put("t2-store", "t2"); - topology = ProcessorTopology.withGlobalStores(Utils.<StateStore>mkList(store1, store2), storeToTopic); + storeToTopic.put(storeName1, t1.topic()); + storeToTopic.put(storeName2, t2.topic()); + storeToTopic.put(storeName3, t3.topic()); + storeToTopic.put(storeName4, t4.topic()); - context = new NoOpProcessorContext(); - config = new StreamsConfig(new Properties() { + store1 = new NoOpReadOnlyStore<>(storeName1, true); + store2 = new NoOpReadOnlyStore<>(storeName2, true); + store3 = new NoOpReadOnlyStore<>(storeName3); + store4 = new NoOpReadOnlyStore<>(storeName4); + + topology = ProcessorTopology.withGlobalStores(Utils.<StateStore>mkList(store1, store2, store3, store4), storeToTopic); + + streamsConfig = new StreamsConfig(new Properties() { { put(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"); put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); } }); - stateDirectory = new StateDirectory(config, time); - consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + stateDirectory = new StateDirectory(streamsConfig, time); + consumer = new MockConsumer<>(OffsetResetStrategy.NONE); stateManager = new GlobalStateManagerImpl( - new LogContext("mock"), + new LogContext("test"), topology, consumer, stateDirectory, stateRestoreListener, - config); + streamsConfig); + mockProcessorContext = new MockProcessorContext(stateDirectory.globalStateDir(), streamsConfig); + stateManager.setGlobalProcessorContext(mockProcessorContext); checkpointFile = new File(stateManager.baseDir(), ProcessorStateManager.CHECKPOINT_FILE_NAME); } @@ -119,16 +132,16 @@ public class GlobalStateManagerImplTest { @Test public void shouldLockGlobalStateDirectory() { - stateManager.initialize(context); + stateManager.initialize(); assertTrue(new File(stateDirectory.globalStateDir(), ".lock").exists()); } @Test(expected = LockException.class) public void shouldThrowLockExceptionIfCantGetLock() throws IOException { - final StateDirectory stateDir = new StateDirectory(config, time); + final StateDirectory stateDir = new StateDirectory(streamsConfig, time); try { stateDir.lockGlobalState(); - stateManager.initialize(context); + stateManager.initialize(); } finally { stateDir.unlockGlobalState(); } @@ -138,7 +151,7 @@ public class GlobalStateManagerImplTest { public void shouldReadCheckpointOffsets() throws IOException { final Map<TopicPartition, Long> expected = writeCheckpoint(); - stateManager.initialize(context); + stateManager.initialize(); final Map<TopicPartition, Long> offsets = stateManager.checkpointed(); assertEquals(expected, offsets); } @@ -146,35 +159,35 @@ public class GlobalStateManagerImplTest { @Test public void shouldNotDeleteCheckpointFileAfterLoaded() throws IOException { writeCheckpoint(); - stateManager.initialize(context); + stateManager.initialize(); assertTrue(checkpointFile.exists()); } @Test(expected = StreamsException.class) public void shouldThrowStreamsExceptionIfFailedToReadCheckpointedOffsets() throws IOException { writeCorruptCheckpoint(); - stateManager.initialize(context); + stateManager.initialize(); } @Test public void shouldInitializeStateStores() { - stateManager.initialize(context); + stateManager.initialize(); assertTrue(store1.initialized); assertTrue(store2.initialized); } @Test public void shouldReturnInitializedStoreNames() { - final Set<String> storeNames = stateManager.initialize(context); - assertEquals(Utils.mkSet(store1.name(), store2.name()), storeNames); + final Set<String> storeNames = stateManager.initialize(); + assertEquals(Utils.mkSet(storeName1, storeName2, storeName3, storeName4), storeNames); } @Test public void shouldThrowIllegalArgumentIfTryingToRegisterStoreThatIsNotGlobal() { - stateManager.initialize(context); + stateManager.initialize(); try { - stateManager.register(new NoOpReadOnlyStore<>("not-in-topology"), new TheStateRestoreCallback()); + stateManager.register(new NoOpReadOnlyStore<>("not-in-topology"), stateRestoreCallback); fail("should have raised an illegal argument exception as store is not in the topology"); } catch (final IllegalArgumentException e) { // pass @@ -183,11 +196,11 @@ public class GlobalStateManagerImplTest { @Test public void shouldThrowIllegalArgumentExceptionIfAttemptingToRegisterStoreTwice() { - stateManager.initialize(context); + stateManager.initialize(); initializeConsumer(2, 1, t1); - stateManager.register(store1, new TheStateRestoreCallback()); + stateManager.register(store1, stateRestoreCallback); try { - stateManager.register(store1, new TheStateRestoreCallback()); + stateManager.register(store1, stateRestoreCallback); fail("should have raised an illegal argument exception as store has already been registered"); } catch (final IllegalArgumentException e) { // pass @@ -196,9 +209,9 @@ public class GlobalStateManagerImplTest { @Test public void shouldThrowStreamsExceptionIfNoPartitionsFoundForStore() { - stateManager.initialize(context); + stateManager.initialize(); try { - stateManager.register(store1, new TheStateRestoreCallback()); + stateManager.register(store1, stateRestoreCallback); fail("Should have raised a StreamsException as there are no partition for the store"); } catch (final StreamsException e) { // pass @@ -209,9 +222,23 @@ public class GlobalStateManagerImplTest { public void shouldRestoreRecordsUpToHighwatermark() { initializeConsumer(2, 1, t1); - stateManager.initialize(context); + stateManager.initialize(); + + stateManager.register(store1, stateRestoreCallback); + assertEquals(2, stateRestoreCallback.restored.size()); + } + + @Test + public void shouldRecoverFromInvalidOffsetExceptionAndRestoreRecords() { + initializeConsumer(2, 1, t1); + consumer.setException(new InvalidOffsetException("Try Again!") { + public Set<TopicPartition> partitions() { + return Collections.singleton(t1); + } + }); + + stateManager.initialize(); - final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback(); stateManager.register(store1, stateRestoreCallback); assertEquals(2, stateRestoreCallback.restored.size()); } @@ -219,9 +246,8 @@ public class GlobalStateManagerImplTest { @Test public void shouldListenForRestoreEvents() { initializeConsumer(5, 1, t1); - stateManager.initialize(context); + stateManager.initialize(); - final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback(); stateManager.register(store1, stateRestoreCallback); assertThat(stateRestoreListener.restoreStartOffset, equalTo(1L)); @@ -242,8 +268,7 @@ public class GlobalStateManagerImplTest { ProcessorStateManager.CHECKPOINT_FILE_NAME)); offsetCheckpoint.write(Collections.singletonMap(t1, 6L)); - stateManager.initialize(context); - final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback(); + stateManager.initialize(); stateManager.register(store1, stateRestoreCallback); assertEquals(5, stateRestoreCallback.restored.size()); } @@ -251,8 +276,7 @@ public class GlobalStateManagerImplTest { @Test public void shouldFlushStateStores() { - stateManager.initialize(context); - final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback(); + stateManager.initialize(); // register the stores initializeConsumer(1, 1, t1); stateManager.register(store1, stateRestoreCallback); @@ -266,8 +290,7 @@ public class GlobalStateManagerImplTest { @Test(expected = ProcessorStateException.class) public void shouldThrowProcessorStateStoreExceptionIfStoreFlushFailed() { - stateManager.initialize(context); - final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback(); + stateManager.initialize(); // register the stores initializeConsumer(1, 1, t1); stateManager.register(new NoOpReadOnlyStore(store1.name()) { @@ -282,8 +305,7 @@ public class GlobalStateManagerImplTest { @Test public void shouldCloseStateStores() throws IOException { - stateManager.initialize(context); - final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback(); + stateManager.initialize(); // register the stores initializeConsumer(1, 1, t1); stateManager.register(store1, stateRestoreCallback); @@ -297,8 +319,7 @@ public class GlobalStateManagerImplTest { @Test public void shouldWriteCheckpointsOnClose() throws IOException { - stateManager.initialize(context); - final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback(); + stateManager.initialize(); initializeConsumer(1, 1, t1); stateManager.register(store1, stateRestoreCallback); final Map<TopicPartition, Long> expected = Collections.singletonMap(t1, 25L); @@ -309,7 +330,7 @@ public class GlobalStateManagerImplTest { @Test(expected = ProcessorStateException.class) public void shouldThrowProcessorStateStoreExceptionIfStoreCloseFailed() throws IOException { - stateManager.initialize(context); + stateManager.initialize(); initializeConsumer(1, 1, t1); stateManager.register(new NoOpReadOnlyStore(store1.name()) { @Override @@ -323,7 +344,7 @@ public class GlobalStateManagerImplTest { @Test public void shouldThrowIllegalArgumentExceptionIfCallbackIsNull() { - stateManager.initialize(context); + stateManager.initialize(); try { stateManager.register(store1, null); fail("should have thrown due to null callback"); @@ -334,9 +355,9 @@ public class GlobalStateManagerImplTest { @Test public void shouldUnlockGlobalStateDirectoryOnClose() throws IOException { - stateManager.initialize(context); + stateManager.initialize(); stateManager.close(Collections.<TopicPartition, Long>emptyMap()); - final StateDirectory stateDir = new StateDirectory(config, new MockTime()); + final StateDirectory stateDir = new StateDirectory(streamsConfig, new MockTime()); try { // should be able to get the lock now as it should've been released in close assertTrue(stateDir.lockGlobalState()); @@ -347,7 +368,7 @@ public class GlobalStateManagerImplTest { @Test public void shouldNotCloseStoresIfCloseAlreadyCalled() throws IOException { - stateManager.initialize(context); + stateManager.initialize(); initializeConsumer(1, 1, t1); stateManager.register(new NoOpReadOnlyStore("t1-store") { @Override @@ -366,7 +387,7 @@ public class GlobalStateManagerImplTest { @Test public void shouldAttemptToCloseAllStoresEvenWhenSomeException() throws IOException { - stateManager.initialize(context); + stateManager.initialize(); initializeConsumer(1, 1, t1); initializeConsumer(1, 1, t2); final NoOpReadOnlyStore store = new NoOpReadOnlyStore("t1-store") { @@ -393,11 +414,11 @@ public class GlobalStateManagerImplTest { public void shouldReleaseLockIfExceptionWhenLoadingCheckpoints() throws IOException { writeCorruptCheckpoint(); try { - stateManager.initialize(context); + stateManager.initialize(); } catch (StreamsException e) { // expected } - final StateDirectory stateDir = new StateDirectory(config, new MockTime()); + final StateDirectory stateDir = new StateDirectory(streamsConfig, new MockTime()); try { // should be able to get the lock now as it should've been released assertTrue(stateDir.lockGlobalState()); @@ -409,7 +430,7 @@ public class GlobalStateManagerImplTest { @Test public void shouldCheckpointOffsets() throws IOException { final Map<TopicPartition, Long> offsets = Collections.singletonMap(t1, 25L); - stateManager.initialize(context); + stateManager.initialize(); stateManager.checkpoint(offsets); @@ -420,8 +441,7 @@ public class GlobalStateManagerImplTest { @Test public void shouldNotRemoveOffsetsOfUnUpdatedTablesDuringCheckpoint() { - stateManager.initialize(context); - final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback(); + stateManager.initialize(); initializeConsumer(10, 1, t1); stateManager.register(store1, stateRestoreCallback); initializeConsumer(20, 1, t2); @@ -450,8 +470,7 @@ public class GlobalStateManagerImplTest { final byte[] expectedValue = "value".getBytes(); consumer.addRecord(new ConsumerRecord<>(t1.topic(), t1.partition(), 2, expectedKey, expectedValue)); - stateManager.initialize(context); - final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback(); + stateManager.initialize(); stateManager.register(store1, stateRestoreCallback); final KeyValue<byte[], byte[]> restoredKv = stateRestoreCallback.restored.get(0); assertThat(stateRestoreCallback.restored, equalTo(Collections.singletonList(KeyValue.pair(restoredKv.key, restoredKv.value)))); @@ -459,8 +478,7 @@ public class GlobalStateManagerImplTest { @Test public void shouldCheckpointRestoredOffsetsToFile() throws IOException { - stateManager.initialize(context); - final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback(); + stateManager.initialize(); initializeConsumer(10, 1, t1); stateManager.register(store1, stateRestoreCallback); stateManager.close(Collections.<TopicPartition, Long>emptyMap()); @@ -478,15 +496,15 @@ public class GlobalStateManagerImplTest { @Test public void shouldThrowLockExceptionIfIOExceptionCaughtWhenTryingToLockStateDir() { - stateManager = new GlobalStateManagerImpl(new LogContext("mock"), topology, consumer, new StateDirectory(config, time) { + stateManager = new GlobalStateManagerImpl(new LogContext("mock"), topology, consumer, new StateDirectory(streamsConfig, time) { @Override public boolean lockGlobalState() throws IOException { throw new IOException("KABOOM!"); } - }, stateRestoreListener, config); + }, stateRestoreListener, streamsConfig); try { - stateManager.initialize(context); + stateManager.initialize(); fail("Should have thrown LockException"); } catch (final LockException e) { // pass @@ -504,7 +522,7 @@ public class GlobalStateManagerImplTest { throw new TimeoutException(); } }; - config = new StreamsConfig(new Properties() { + streamsConfig = new StreamsConfig(new Properties() { { put(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"); @@ -520,7 +538,7 @@ public class GlobalStateManagerImplTest { consumer, stateDirectory, stateRestoreListener, - config); + streamsConfig); } catch (final StreamsException expected) { assertEquals(numberOfCalls.get(), retries); } @@ -537,7 +555,7 @@ public class GlobalStateManagerImplTest { throw new TimeoutException(); } }; - config = new StreamsConfig(new Properties() { + streamsConfig = new StreamsConfig(new Properties() { { put(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"); @@ -553,12 +571,74 @@ public class GlobalStateManagerImplTest { consumer, stateDirectory, stateRestoreListener, - config); + streamsConfig); } catch (final StreamsException expected) { assertEquals(numberOfCalls.get(), retries); } } + @Test + public void shouldDeleteAndRecreateStoreDirectoryOnReinitialize() throws IOException { + final File storeDirectory1 = new File(stateDirectory.globalStateDir().getAbsolutePath() + + File.separator + "rocksdb" + + File.separator + storeName1); + final File storeDirectory2 = new File(stateDirectory.globalStateDir().getAbsolutePath() + + File.separator + "rocksdb" + + File.separator + storeName2); + final File storeDirectory3 = new File(stateDirectory.globalStateDir().getAbsolutePath() + + File.separator + storeName3); + final File storeDirectory4 = new File(stateDirectory.globalStateDir().getAbsolutePath() + + File.separator + storeName4); + final File testFile1 = new File(storeDirectory1.getAbsolutePath() + File.separator + "testFile"); + final File testFile2 = new File(storeDirectory2.getAbsolutePath() + File.separator + "testFile"); + final File testFile3 = new File(storeDirectory3.getAbsolutePath() + File.separator + "testFile"); + final File testFile4 = new File(storeDirectory4.getAbsolutePath() + File.separator + "testFile"); + + consumer.updatePartitions(t1.topic(), Collections.singletonList(new PartitionInfo(t1.topic(), t1.partition(), null, null, null))); + consumer.updatePartitions(t2.topic(), Collections.singletonList(new PartitionInfo(t2.topic(), t2.partition(), null, null, null))); + consumer.updatePartitions(t3.topic(), Collections.singletonList(new PartitionInfo(t3.topic(), t3.partition(), null, null, null))); + consumer.updatePartitions(t4.topic(), Collections.singletonList(new PartitionInfo(t4.topic(), t4.partition(), null, null, null))); + consumer.updateBeginningOffsets(new HashMap<TopicPartition, Long>() { + { + put(t1, 0L); + put(t2, 0L); + put(t3, 0L); + put(t4, 0L); + } + }); + consumer.updateEndOffsets(new HashMap<TopicPartition, Long>() { + { + put(t1, 0L); + put(t2, 0L); + put(t3, 0L); + put(t4, 0L); + } + }); + + stateManager.initialize(); + stateManager.register(store1, stateRestoreCallback); + stateManager.register(store2, stateRestoreCallback); + stateManager.register(store3, stateRestoreCallback); + stateManager.register(store4, stateRestoreCallback); + + testFile1.createNewFile(); + assertTrue(testFile1.exists()); + testFile2.createNewFile(); + assertTrue(testFile2.exists()); + testFile3.createNewFile(); + assertTrue(testFile3.exists()); + testFile4.createNewFile(); + assertTrue(testFile4.exists()); + + // only delete and recreate store 1 and 3 -- 2 and 4 must be untouched + stateManager.reinitializeStateStoresForPartitions(Utils.mkList(t1, t3), mockProcessorContext); + + assertFalse(testFile1.exists()); + assertTrue(testFile2.exists()); + assertFalse(testFile3.exists()); + assertTrue(testFile4.exists()); + } + private void writeCorruptCheckpoint() throws IOException { final File checkpointFile = new File(stateManager.baseDir(), ProcessorStateManager.CHECKPOINT_FILE_NAME); try (final FileOutputStream stream = new FileOutputStream(checkpointFile)) { http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java index 2bd2d42..c71f469 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java @@ -16,17 +16,26 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; -import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.internals.InternalNameProvider; +import org.apache.kafka.streams.kstream.internals.KTableSource; +import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer; +import org.apache.kafka.streams.kstream.internals.MaterializedInternal; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.test.MockStateRestoreListener; import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; @@ -36,7 +45,9 @@ import org.junit.Test; import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Set; +import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.DEAD; import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.RUNNING; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; @@ -47,20 +58,49 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class GlobalStreamThreadTest { - private final KStreamBuilder builder = new KStreamBuilder(); - private final MockConsumer<byte[], byte[]> mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + private final InternalTopologyBuilder builder = new InternalTopologyBuilder(); + private final MockConsumer<byte[], byte[]> mockConsumer = new MockConsumer<>(OffsetResetStrategy.NONE); private final MockTime time = new MockTime(); private final MockStateRestoreListener stateRestoreListener = new MockStateRestoreListener(); private GlobalStreamThread globalStreamThread; private StreamsConfig config; + private final static String GLOBAL_STORE_TOPIC_NAME = "foo"; + private final static String GLOBAL_STORE_NAME = "bar"; + private final TopicPartition topicPartition = new TopicPartition(GLOBAL_STORE_TOPIC_NAME, 0); + + @SuppressWarnings("unchecked") @Before public void before() { - builder.globalTable("foo", "bar"); + final MaterializedInternal<Object, Object, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>( + Materialized.<Object, Object, KeyValueStore<Bytes, byte[]>>with(null, null), + new InternalNameProvider() { + @Override + public String newProcessorName(String prefix) { + return "processorName"; + } + + @Override + public String newStoreName(String prefix) { + return GLOBAL_STORE_NAME; + } + }, + "store-"); + + builder.addGlobalStore( + (StoreBuilder) new KeyValueStoreMaterializer<>(materialized).materialize().withLoggingDisabled(), + "sourceName", + null, + null, + null, + GLOBAL_STORE_TOPIC_NAME, + "processorName", + new KTableSource<>(GLOBAL_STORE_NAME)); + final HashMap<String, Object> properties = new HashMap<>(); properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "blah"); properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "blah"); - properties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + properties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); config = new StreamsConfig(properties); globalStreamThread = new GlobalStreamThread(builder.buildGlobalStateTopology(), config, @@ -115,7 +155,6 @@ public class GlobalStreamThreadTest { assertFalse(globalStreamThread.stillRunning()); } - @Test public void shouldBeRunningAfterSuccessfulStart() { initializeConsumer(); @@ -136,7 +175,7 @@ public class GlobalStreamThreadTest { public void shouldCloseStateStoresOnClose() throws InterruptedException { initializeConsumer(); globalStreamThread.start(); - final StateStore globalStore = builder.globalStateStores().get("bar"); + final StateStore globalStore = builder.globalStateStores().get(GLOBAL_STORE_NAME); assertTrue(globalStore.isOpen()); globalStreamThread.shutdown(); globalStreamThread.join(); @@ -146,7 +185,6 @@ public class GlobalStreamThreadTest { @SuppressWarnings("unchecked") @Test public void shouldTransitionToDeadOnClose() throws InterruptedException { - initializeConsumer(); globalStreamThread.start(); globalStreamThread.shutdown(); @@ -158,7 +196,6 @@ public class GlobalStreamThreadTest { @SuppressWarnings("unchecked") @Test public void shouldStayDeadAfterTwoCloses() throws InterruptedException { - initializeConsumer(); globalStreamThread.start(); globalStreamThread.shutdown(); @@ -170,8 +207,7 @@ public class GlobalStreamThreadTest { @SuppressWarnings("unchecked") @Test - public void shouldTransitiontoRunningOnStart() throws InterruptedException { - + public void shouldTransitionToRunningOnStart() throws InterruptedException { initializeConsumer(); globalStreamThread.start(); TestUtils.waitForCondition(new TestCondition() { @@ -183,19 +219,52 @@ public class GlobalStreamThreadTest { globalStreamThread.shutdown(); } + @Test + public void shouldDieOnInvalidOffsetException() throws Exception { + initializeConsumer(); + globalStreamThread.start(); + TestUtils.waitForCondition(new TestCondition() { + @Override + public boolean conditionMet() { + return globalStreamThread.state() == RUNNING; + } + }, 10 * 1000, "Thread never started."); + mockConsumer.updateEndOffsets(Collections.singletonMap(topicPartition, 1L)); + mockConsumer.addRecord(new ConsumerRecord<>(GLOBAL_STORE_TOPIC_NAME, 0, 0L, "K1".getBytes(), "V1".getBytes())); + + TestUtils.waitForCondition(new TestCondition() { + @Override + public boolean conditionMet() { + return mockConsumer.position(topicPartition) == 1L; + } + }, 10 * 1000, "Input record never consumed"); + + mockConsumer.setException(new InvalidOffsetException("Try Again!") { + @Override + public Set<TopicPartition> partitions() { + return Collections.singleton(topicPartition); + } + }); + // feed first record for recovery + mockConsumer.addRecord(new ConsumerRecord<>(GLOBAL_STORE_TOPIC_NAME, 0, 0L, "K1".getBytes(), "V1".getBytes())); + + TestUtils.waitForCondition(new TestCondition() { + @Override + public boolean conditionMet() { + return globalStreamThread.state() == DEAD; + } + }, 10 * 1000, "GlobalStreamThread should have died."); + } private void initializeConsumer() { - mockConsumer.updatePartitions("foo", Collections.singletonList(new PartitionInfo("foo", - 0, - null, - new Node[0], - new Node[0]))); - final TopicPartition topicPartition = new TopicPartition("foo", 0); + mockConsumer.updatePartitions(GLOBAL_STORE_TOPIC_NAME, Collections.singletonList(new PartitionInfo(GLOBAL_STORE_TOPIC_NAME, + 0, + null, + new Node[0], + new Node[0]))); mockConsumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, 0L)); mockConsumer.updateEndOffsets(Collections.singletonMap(topicPartition, 0L)); + mockConsumer.assign(Collections.singleton(topicPartition)); } - - - -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java index 2bb5b7b..725211d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java @@ -135,7 +135,7 @@ public class StateConsumerTest { private boolean flushed; private boolean closed; - public StateMaintainerStub(final Map<TopicPartition, Long> partitionOffsets) { + StateMaintainerStub(final Map<TopicPartition, Long> partitionOffsets) { this.partitionOffsets = partitionOffsets; } http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java index 47a0015..b14731d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java @@ -23,6 +23,7 @@ import org.apache.kafka.streams.processor.StateStore; import java.io.File; import java.io.IOException; +import java.util.Collection; import java.util.Map; public class StateManagerStub implements StateManager { @@ -33,7 +34,12 @@ public class StateManagerStub implements StateManager { } @Override - public void register(final StateStore store, final StateRestoreCallback stateRestoreCallback) {} + public void register(final StateStore store, + final StateRestoreCallback stateRestoreCallback) {} + + @Override + public void reinitializeStateStoresForPartitions(final Collection<TopicPartition> partitions, + final InternalProcessorContext processorContext) {} @Override public void flush() {} http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index 342354c..9f6f712 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.PartitionInfo; @@ -43,6 +44,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_BATCH; @@ -52,6 +54,7 @@ import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.replay; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsEqual.equalTo; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -124,6 +127,30 @@ public class StoreChangelogReaderTest { } @Test + public void shouldRecoverFromInvalidOffsetExceptionAndFinishRestore() { + final int messages = 10; + setupConsumer(messages, topicPartition); + consumer.setException(new InvalidOffsetException("Try Again!") { + @Override + public Set<TopicPartition> partitions() { + return Collections.singleton(topicPartition); + } + }); + changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, + "storeName")); + + EasyMock.expect(active.restoringTaskFor(topicPartition)).andReturn(task); + EasyMock.replay(active); + + // first restore call "fails" but we should not die with an exception + assertEquals(0, changelogReader.restore(active).size()); + // retry restore should succeed + assertEquals(1, changelogReader.restore(active).size()); + assertThat(callback.restored.size(), equalTo(messages)); + } + + + @Test public void shouldRestoreMessagesFromCheckpoint() { final int messages = 10; setupConsumer(messages, topicPartition); http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index c852ae3..4aee8f5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -35,9 +35,6 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.errors.StreamsException; -import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateRestoreListener; @@ -49,7 +46,6 @@ import org.apache.kafka.test.MockSourceNode; import org.apache.kafka.test.MockStateRestoreListener; import org.apache.kafka.test.MockStateStore; import org.apache.kafka.test.MockTimestampExtractor; -import org.apache.kafka.test.NoOpProcessorContext; import org.apache.kafka.test.NoOpRecordCollector; import org.apache.kafka.test.TestUtils; import org.junit.After; @@ -465,12 +461,12 @@ public class StreamTaskTest { task.process(); fail("Should've thrown StreamsException"); } catch (final Exception e) { - assertThat(((ProcessorContextImpl) task.processorContext()).currentNode(), nullValue()); + assertThat(task.processorContext.currentNode(), nullValue()); } } @Test - public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuating() { + public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingStreamTime() { task = createStatelessTask(false); task.initialize(); @@ -483,32 +479,19 @@ public class StreamTaskTest { }); fail("Should've thrown StreamsException"); } catch (final StreamsException e) { - assertThat(((ProcessorContextImpl) task.processorContext()).currentNode(), nullValue()); + final String message = e.getMessage(); + assertTrue("message=" + message + " should contain processor", message.contains("processor '" + processorStreamTime.name() + "'")); + assertThat(task.processorContext.currentNode(), nullValue()); } } @Test - public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingStreamTime() { - final Processor<Object, Object> processor = new AbstractProcessor<Object, Object>() { - @Override - public void init(final ProcessorContext context) { - } - - @Override - public void process(final Object key, final Object value) {} - - @Override - public void punctuate(final long timestamp) {} - }; - - final ProcessorNode<Object, Object> punctuator = new ProcessorNode<>("test", processor, Collections.<String>emptySet()); - punctuator.init(new NoOpProcessorContext()); - + public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingWallClockTimeTime() { task = createStatelessTask(false); task.initialize(); try { - task.punctuate(punctuator, 1, PunctuationType.STREAM_TIME, new Punctuator() { + task.punctuate(processorSystemTime, 1, PunctuationType.WALL_CLOCK_TIME, new Punctuator() { @Override public void punctuate(long timestamp) { throw new KafkaException("KABOOM!"); @@ -516,7 +499,9 @@ public class StreamTaskTest { }); fail("Should've thrown StreamsException"); } catch (final StreamsException e) { - assertThat(((ProcessorContextImpl) task.processorContext()).currentNode(), nullValue()); + final String message = e.getMessage(); + assertTrue("message=" + message + " should contain processor", message.contains("processor '" + processorSystemTime.name() + "'")); + assertThat(task.processorContext.currentNode(), nullValue()); } } @@ -567,7 +552,7 @@ public class StreamTaskTest { public void shouldThrowIllegalStateExceptionIfCurrentNodeIsNotNullWhenPunctuateCalled() { task = createStatelessTask(false); task.initialize(); - ((ProcessorContextImpl) task.processorContext()).setCurrentNode(processorStreamTime); + task.processorContext.setCurrentNode(processorStreamTime); try { task.punctuate(processorStreamTime, 10, PunctuationType.STREAM_TIME, punctuator); fail("Should throw illegal state exception as current node is not null"); @@ -591,7 +576,7 @@ public class StreamTaskTest { task = createStatelessTask(false); task.initialize(); task.punctuate(processorStreamTime, 5, PunctuationType.STREAM_TIME, punctuator); - assertThat(((ProcessorContextImpl) task.processorContext()).currentNode(), nullValue()); + assertThat(((ProcessorContextImpl) task.context()).currentNode(), nullValue()); } @Test(expected = IllegalStateException.class) @@ -608,7 +593,7 @@ public class StreamTaskTest { @Test public void shouldNotThrowExceptionOnScheduleIfCurrentNodeIsNotNull() { task = createStatelessTask(false); - ((ProcessorContextImpl) task.processorContext()).setCurrentNode(processorStreamTime); + task.processorContext.setCurrentNode(processorStreamTime); task.schedule(1, PunctuationType.STREAM_TIME, new Punctuator() { @Override public void punctuate(long timestamp) { @@ -744,7 +729,7 @@ public class StreamTaskTest { } @Test - public void shouldNotAbortTransactionOnZombieClosedIfEosEnabled() throws Exception { + public void shouldNotAbortTransactionOnZombieClosedIfEosEnabled() { task = createStatelessTask(true); task.close(false, true); task = null; http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 69ae07c..8bcd6fb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; @@ -26,17 +27,20 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.TaskMigratedException; +import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.internals.ConsumedInternal; import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder; import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TaskMetadata; import org.apache.kafka.streams.processor.ThreadMetadata; +import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.test.MockClientSupplier; import org.apache.kafka.test.MockStateRestoreListener; import org.apache.kafka.test.MockTimestampExtractor; @@ -631,7 +635,8 @@ public class StreamThreadTest { @Test public void shouldReturnStandbyTaskMetadataWhileRunningState() throws InterruptedException { - internalStreamsBuilder.stream(Collections.singleton(topic1), consumed).groupByKey().count("count-one"); + internalStreamsBuilder.stream(Collections.singleton(topic1), consumed) + .groupByKey().count(Materialized.<Object, Long, KeyValueStore<Bytes, byte[]>>as("count-one")); final StreamThread thread = createStreamThread(clientId, config, false); final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer; @@ -681,7 +686,8 @@ public class StreamThreadTest { @Test public void shouldAlwaysReturnEmptyTasksMetadataWhileRebalancingStateAndTasksNotRunning() throws InterruptedException { - internalStreamsBuilder.stream(Collections.singleton(topic1), consumed).groupByKey().count("count-one"); + internalStreamsBuilder.stream(Collections.singleton(topic1), consumed) + .groupByKey().count(Materialized.<Object, Long, KeyValueStore<Bytes, byte[]>>as("count-one")); final StreamThread thread = createStreamThread(clientId, config, false); final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer; @@ -728,6 +734,97 @@ public class StreamThreadTest { assertThreadMetadataHasEmptyTasksWithState(thread.threadMetadata(), StreamThread.State.PARTITIONS_ASSIGNED); } + @Test + public void shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore() throws Exception { + internalStreamsBuilder.stream(Collections.singleton("topic"), consumed) + .groupByKey().count(Materialized.<Object, Long, KeyValueStore<Bytes, byte[]>>as("count")); + + final StreamThread thread = createStreamThread("cliendId", config, false); + final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer; + final MockConsumer<byte[], byte[]> mockRestoreConsumer = (MockConsumer<byte[], byte[]>) thread.restoreConsumer; + + final TopicPartition topicPartition = new TopicPartition("topic", 0); + final Set<TopicPartition> topicPartitionSet = Collections.singleton(topicPartition); + + final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); + activeTasks.put(new TaskId(0, 0), topicPartitionSet); + thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap()); + + mockConsumer.updatePartitions("topic", new ArrayList<PartitionInfo>() { + { + add(new PartitionInfo("topic", + 0, + null, + new Node[0], + new Node[0])); + } + }); + mockConsumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, 0L)); + + mockRestoreConsumer.updatePartitions("stream-thread-test-count-changelog", new ArrayList<PartitionInfo>() { + { + add(new PartitionInfo("stream-thread-test-count-changelog", + 0, + null, + new Node[0], + new Node[0])); + } + }); + final TopicPartition changelogPartition = new TopicPartition("stream-thread-test-count-changelog", 0); + final Set<TopicPartition> changelogPartitionSet = Collections.singleton(changelogPartition); + mockRestoreConsumer.updateBeginningOffsets(Collections.singletonMap(changelogPartition, 0L)); + mockRestoreConsumer.updateEndOffsets(Collections.singletonMap(changelogPartition, 2L)); + + mockConsumer.schedulePollTask(new Runnable() { + @Override + public void run() { + thread.setState(StreamThread.State.PARTITIONS_REVOKED); + thread.rebalanceListener.onPartitionsAssigned(topicPartitionSet); + } + }); + + try { + thread.start(); + + TestUtils.waitForCondition(new TestCondition() { + @Override + public boolean conditionMet() { + return mockRestoreConsumer.assignment().size() == 1; + } + }, "Never restore first record"); + + mockRestoreConsumer.addRecord(new ConsumerRecord<>("stream-thread-test-count-changelog", 0, 0L, "K1".getBytes(), "V1".getBytes())); + + TestUtils.waitForCondition(new TestCondition() { + @Override + public boolean conditionMet() { + return mockRestoreConsumer.position(changelogPartition) == 1L; + } + }, "Never restore first record"); + + mockRestoreConsumer.setException(new InvalidOffsetException("Try Again!") { + @Override + public Set<TopicPartition> partitions() { + return changelogPartitionSet; + } + }); + + mockRestoreConsumer.addRecord(new ConsumerRecord<>("stream-thread-test-count-changelog", 0, 0L, "K1".getBytes(), "V1".getBytes())); + mockRestoreConsumer.addRecord(new ConsumerRecord<>("stream-thread-test-count-changelog", 0, 1L, "K2".getBytes(), "V2".getBytes())); + + TestUtils.waitForCondition(new TestCondition() { + @Override + public boolean conditionMet() { + mockRestoreConsumer.assign(changelogPartitionSet); + return mockRestoreConsumer.position(changelogPartition) == 2L; + } + }, "Never finished restore"); + } finally { + thread.shutdown(); + thread.join(10000); + } + } + private void assertThreadMetadataHasEmptyTasksWithState(ThreadMetadata metadata, StreamThread.State state) { assertEquals(state.name(), metadata.threadState()); assertTrue(metadata.activeTasks().isEmpty()); http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java index 6677084..ae0b923 100644 --- a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java +++ b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java @@ -24,6 +24,7 @@ import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import java.io.File; import java.io.IOException; +import java.util.Collection; import java.util.Map; import java.util.Set; @@ -40,20 +41,26 @@ public class GlobalStateManagerStub implements GlobalStateManager { } @Override - public Set<String> initialize(final InternalProcessorContext processorContext) { + public void setGlobalProcessorContext(InternalProcessorContext processorContext) {} + + @Override + public Set<String> initialize() { initialized = true; return storeNames; } - + + @Override + public void reinitializeStateStoresForPartitions(final Collection<TopicPartition> partitions, + final InternalProcessorContext processorContext) {} + @Override public File baseDir() { return null; } @Override - public void register(final StateStore store, final StateRestoreCallback stateRestoreCallback) { - - } + public void register(final StateStore store, + final StateRestoreCallback stateRestoreCallback) {} @Override public void flush() {} http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java index 0ada2e4..ae46b8d 100644 --- a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java +++ b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java @@ -21,21 +21,30 @@ import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import java.io.File; + public class NoOpReadOnlyStore<K, V> implements ReadOnlyKeyValueStore<K, V>, StateStore { private final String name; + private final boolean rocksdbStore; private boolean open = true; public boolean initialized; public boolean flushed; public NoOpReadOnlyStore() { - this(""); + this("", false); } public NoOpReadOnlyStore(final String name) { + this(name, false); + } + + public NoOpReadOnlyStore(final String name, + final boolean rocksdbStore) { this.name = name; + this.rocksdbStore = rocksdbStore; } @Override @@ -65,6 +74,12 @@ public class NoOpReadOnlyStore<K, V> @Override public void init(final ProcessorContext context, final StateStore root) { + if (rocksdbStore) { + // cf. RocksDBStore + new File(context.stateDir() + File.separator + "rocksdb" + File.separator + name).mkdirs(); + } else { + new File(context.stateDir() + File.separator + name).mkdir(); + } this.initialized = true; } http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java index 887b10d..ae0cc9c 100644 --- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java @@ -37,7 +37,6 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; -import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl; @@ -155,11 +154,9 @@ public class ProcessorTopologyTestDriver { private final Map<String, Queue<ProducerRecord<byte[], byte[]>>> outputRecordsByTopic = new HashMap<>(); private final Set<String> internalTopics = new HashSet<>(); private final Map<String, TopicPartition> globalPartitionsByTopic = new HashMap<>(); - private final StateRestoreListener stateRestoreListener = new MockStateRestoreListener(); private StreamTask task; private GlobalStateUpdateTask globalStateTask; - /** * Create a new test diver instance * @param config the stream configuration for the topology @@ -227,8 +224,10 @@ public class ProcessorTopologyTestDriver { stateDirectory, stateRestoreListener, config); + final GlobalProcessorContextImpl globalProcessorContext = new GlobalProcessorContextImpl(config, stateManager, streamsMetrics, cache); + stateManager.setGlobalProcessorContext(globalProcessorContext); globalStateTask = new GlobalStateUpdateTask(globalTopology, - new GlobalProcessorContextImpl(config, stateManager, streamsMetrics, cache), + globalProcessorContext, stateManager, new LogAndContinueExceptionHandler(), new LogContext()); @@ -242,8 +241,8 @@ public class ProcessorTopologyTestDriver { consumer, new StoreChangelogReader( createRestoreConsumer(topology.storeToChangelogTopic()), - stateRestoreListener, - new LogContext("topology-test-driver ")), + new MockStateRestoreListener(), + new LogContext("topology-test-driver ")), config, streamsMetrics, stateDirectory, cache,