Repository: kafka
Updated Branches:
  refs/heads/trunk 2bf2348b5 -> 043951753


http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index 20cf125..df8d201 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.PartitionInfo;
@@ -33,8 +34,8 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
+import org.apache.kafka.test.MockProcessorContext;
 import org.apache.kafka.test.MockStateRestoreListener;
-import org.apache.kafka.test.NoOpProcessorContext;
 import org.apache.kafka.test.NoOpReadOnlyStore;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -70,45 +71,57 @@ public class GlobalStateManagerImplTest {
     private final MockTime time = new MockTime();
     private final TheStateRestoreCallback stateRestoreCallback = new 
TheStateRestoreCallback();
     private final MockStateRestoreListener stateRestoreListener = new 
MockStateRestoreListener();
+    private final String storeName1 = "t1-store";
+    private final String storeName2 = "t2-store";
+    private final String storeName3 = "t3-store";
+    private final String storeName4 = "t4-store";
     private final TopicPartition t1 = new TopicPartition("t1", 1);
     private final TopicPartition t2 = new TopicPartition("t2", 1);
+    private final TopicPartition t3 = new TopicPartition("t3", 1);
+    private final TopicPartition t4 = new TopicPartition("t4", 1);
     private GlobalStateManagerImpl stateManager;
-    private NoOpProcessorContext context;
     private StateDirectory stateDirectory;
-    private StreamsConfig config;
-    private NoOpReadOnlyStore<Object, Object> store1;
-    private NoOpReadOnlyStore store2;
+    private StreamsConfig streamsConfig;
+    private NoOpReadOnlyStore<Object, Object> store1, store2, store3, store4;
     private MockConsumer<byte[], byte[]> consumer;
     private File checkpointFile;
     private ProcessorTopology topology;
+    private MockProcessorContext mockProcessorContext;
 
     @Before
     public void before() throws IOException {
         final Map<String, String> storeToTopic = new HashMap<>();
-        store1 = new NoOpReadOnlyStore<>("t1-store");
-        store2 = new NoOpReadOnlyStore("t2-store");
-        storeToTopic.put("t1-store", "t1");
-        storeToTopic.put("t2-store", "t2");
 
-        topology = 
ProcessorTopology.withGlobalStores(Utils.<StateStore>mkList(store1, store2), 
storeToTopic);
+        storeToTopic.put(storeName1, t1.topic());
+        storeToTopic.put(storeName2, t2.topic());
+        storeToTopic.put(storeName3, t3.topic());
+        storeToTopic.put(storeName4, t4.topic());
 
-        context = new NoOpProcessorContext();
-        config = new StreamsConfig(new Properties() {
+        store1 = new NoOpReadOnlyStore<>(storeName1, true);
+        store2 = new NoOpReadOnlyStore<>(storeName2, true);
+        store3 = new NoOpReadOnlyStore<>(storeName3);
+        store4 = new NoOpReadOnlyStore<>(storeName4);
+
+        topology = 
ProcessorTopology.withGlobalStores(Utils.<StateStore>mkList(store1, store2, 
store3, store4), storeToTopic);
+
+        streamsConfig = new StreamsConfig(new Properties() {
             {
                 put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
                 put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
                 put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
             }
         });
-        stateDirectory = new StateDirectory(config, time);
-        consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        stateDirectory = new StateDirectory(streamsConfig, time);
+        consumer = new MockConsumer<>(OffsetResetStrategy.NONE);
         stateManager = new GlobalStateManagerImpl(
-            new LogContext("mock"),
+            new LogContext("test"),
             topology,
             consumer,
             stateDirectory,
             stateRestoreListener,
-            config);
+            streamsConfig);
+        mockProcessorContext = new 
MockProcessorContext(stateDirectory.globalStateDir(), streamsConfig);
+        stateManager.setGlobalProcessorContext(mockProcessorContext);
         checkpointFile = new File(stateManager.baseDir(), 
ProcessorStateManager.CHECKPOINT_FILE_NAME);
     }
 
@@ -119,16 +132,16 @@ public class GlobalStateManagerImplTest {
 
     @Test
     public void shouldLockGlobalStateDirectory() {
-        stateManager.initialize(context);
+        stateManager.initialize();
         assertTrue(new File(stateDirectory.globalStateDir(), 
".lock").exists());
     }
 
     @Test(expected = LockException.class)
     public void shouldThrowLockExceptionIfCantGetLock() throws IOException {
-        final StateDirectory stateDir = new StateDirectory(config, time);
+        final StateDirectory stateDir = new StateDirectory(streamsConfig, 
time);
         try {
             stateDir.lockGlobalState();
-            stateManager.initialize(context);
+            stateManager.initialize();
         } finally {
             stateDir.unlockGlobalState();
         }
@@ -138,7 +151,7 @@ public class GlobalStateManagerImplTest {
     public void shouldReadCheckpointOffsets() throws IOException {
         final Map<TopicPartition, Long> expected = writeCheckpoint();
 
-        stateManager.initialize(context);
+        stateManager.initialize();
         final Map<TopicPartition, Long> offsets = stateManager.checkpointed();
         assertEquals(expected, offsets);
     }
@@ -146,35 +159,35 @@ public class GlobalStateManagerImplTest {
     @Test
     public void shouldNotDeleteCheckpointFileAfterLoaded() throws IOException {
         writeCheckpoint();
-        stateManager.initialize(context);
+        stateManager.initialize();
         assertTrue(checkpointFile.exists());
     }
 
     @Test(expected = StreamsException.class)
     public void shouldThrowStreamsExceptionIfFailedToReadCheckpointedOffsets() 
throws IOException {
         writeCorruptCheckpoint();
-        stateManager.initialize(context);
+        stateManager.initialize();
     }
 
     @Test
     public void shouldInitializeStateStores() {
-        stateManager.initialize(context);
+        stateManager.initialize();
         assertTrue(store1.initialized);
         assertTrue(store2.initialized);
     }
 
     @Test
     public void shouldReturnInitializedStoreNames() {
-        final Set<String> storeNames = stateManager.initialize(context);
-        assertEquals(Utils.mkSet(store1.name(), store2.name()), storeNames);
+        final Set<String> storeNames = stateManager.initialize();
+        assertEquals(Utils.mkSet(storeName1, storeName2, storeName3, 
storeName4), storeNames);
     }
 
     @Test
     public void 
shouldThrowIllegalArgumentIfTryingToRegisterStoreThatIsNotGlobal() {
-        stateManager.initialize(context);
+        stateManager.initialize();
 
         try {
-            stateManager.register(new NoOpReadOnlyStore<>("not-in-topology"), 
new TheStateRestoreCallback());
+            stateManager.register(new NoOpReadOnlyStore<>("not-in-topology"), 
stateRestoreCallback);
             fail("should have raised an illegal argument exception as store is 
not in the topology");
         } catch (final IllegalArgumentException e) {
             // pass
@@ -183,11 +196,11 @@ public class GlobalStateManagerImplTest {
 
     @Test
     public void 
shouldThrowIllegalArgumentExceptionIfAttemptingToRegisterStoreTwice() {
-        stateManager.initialize(context);
+        stateManager.initialize();
         initializeConsumer(2, 1, t1);
-        stateManager.register(store1, new TheStateRestoreCallback());
+        stateManager.register(store1, stateRestoreCallback);
         try {
-            stateManager.register(store1, new TheStateRestoreCallback());
+            stateManager.register(store1, stateRestoreCallback);
             fail("should have raised an illegal argument exception as store 
has already been registered");
         } catch (final IllegalArgumentException e) {
             // pass
@@ -196,9 +209,9 @@ public class GlobalStateManagerImplTest {
 
     @Test
     public void shouldThrowStreamsExceptionIfNoPartitionsFoundForStore() {
-        stateManager.initialize(context);
+        stateManager.initialize();
         try {
-            stateManager.register(store1, new TheStateRestoreCallback());
+            stateManager.register(store1, stateRestoreCallback);
             fail("Should have raised a StreamsException as there are no 
partition for the store");
         } catch (final StreamsException e) {
             // pass
@@ -209,9 +222,23 @@ public class GlobalStateManagerImplTest {
     public void shouldRestoreRecordsUpToHighwatermark() {
         initializeConsumer(2, 1, t1);
 
-        stateManager.initialize(context);
+        stateManager.initialize();
+
+        stateManager.register(store1, stateRestoreCallback);
+        assertEquals(2, stateRestoreCallback.restored.size());
+    }
+
+    @Test
+    public void shouldRecoverFromInvalidOffsetExceptionAndRestoreRecords() {
+        initializeConsumer(2, 1, t1);
+        consumer.setException(new InvalidOffsetException("Try Again!") {
+            public Set<TopicPartition> partitions() {
+                return Collections.singleton(t1);
+            }
+        });
+
+        stateManager.initialize();
 
-        final TheStateRestoreCallback stateRestoreCallback = new 
TheStateRestoreCallback();
         stateManager.register(store1, stateRestoreCallback);
         assertEquals(2, stateRestoreCallback.restored.size());
     }
@@ -219,9 +246,8 @@ public class GlobalStateManagerImplTest {
     @Test
     public void shouldListenForRestoreEvents() {
         initializeConsumer(5, 1, t1);
-        stateManager.initialize(context);
+        stateManager.initialize();
 
-        final TheStateRestoreCallback stateRestoreCallback = new 
TheStateRestoreCallback();
         stateManager.register(store1, stateRestoreCallback);
 
         assertThat(stateRestoreListener.restoreStartOffset, equalTo(1L));
@@ -242,8 +268,7 @@ public class GlobalStateManagerImplTest {
                                                                                
 ProcessorStateManager.CHECKPOINT_FILE_NAME));
         offsetCheckpoint.write(Collections.singletonMap(t1, 6L));
 
-        stateManager.initialize(context);
-        final TheStateRestoreCallback stateRestoreCallback = new 
TheStateRestoreCallback();
+        stateManager.initialize();
         stateManager.register(store1,  stateRestoreCallback);
         assertEquals(5, stateRestoreCallback.restored.size());
     }
@@ -251,8 +276,7 @@ public class GlobalStateManagerImplTest {
 
     @Test
     public void shouldFlushStateStores() {
-        stateManager.initialize(context);
-        final TheStateRestoreCallback stateRestoreCallback = new 
TheStateRestoreCallback();
+        stateManager.initialize();
         // register the stores
         initializeConsumer(1, 1, t1);
         stateManager.register(store1, stateRestoreCallback);
@@ -266,8 +290,7 @@ public class GlobalStateManagerImplTest {
 
     @Test(expected = ProcessorStateException.class)
     public void shouldThrowProcessorStateStoreExceptionIfStoreFlushFailed() {
-        stateManager.initialize(context);
-        final TheStateRestoreCallback stateRestoreCallback = new 
TheStateRestoreCallback();
+        stateManager.initialize();
         // register the stores
         initializeConsumer(1, 1, t1);
         stateManager.register(new NoOpReadOnlyStore(store1.name()) {
@@ -282,8 +305,7 @@ public class GlobalStateManagerImplTest {
 
     @Test
     public void shouldCloseStateStores() throws IOException {
-        stateManager.initialize(context);
-        final TheStateRestoreCallback stateRestoreCallback = new 
TheStateRestoreCallback();
+        stateManager.initialize();
         // register the stores
         initializeConsumer(1, 1, t1);
         stateManager.register(store1, stateRestoreCallback);
@@ -297,8 +319,7 @@ public class GlobalStateManagerImplTest {
 
     @Test
     public void shouldWriteCheckpointsOnClose() throws IOException {
-        stateManager.initialize(context);
-        final TheStateRestoreCallback stateRestoreCallback = new 
TheStateRestoreCallback();
+        stateManager.initialize();
         initializeConsumer(1, 1, t1);
         stateManager.register(store1, stateRestoreCallback);
         final Map<TopicPartition, Long> expected = 
Collections.singletonMap(t1, 25L);
@@ -309,7 +330,7 @@ public class GlobalStateManagerImplTest {
 
     @Test(expected = ProcessorStateException.class)
     public void shouldThrowProcessorStateStoreExceptionIfStoreCloseFailed() 
throws IOException {
-        stateManager.initialize(context);
+        stateManager.initialize();
         initializeConsumer(1, 1, t1);
         stateManager.register(new NoOpReadOnlyStore(store1.name()) {
             @Override
@@ -323,7 +344,7 @@ public class GlobalStateManagerImplTest {
 
     @Test
     public void shouldThrowIllegalArgumentExceptionIfCallbackIsNull() {
-        stateManager.initialize(context);
+        stateManager.initialize();
         try {
             stateManager.register(store1, null);
             fail("should have thrown due to null callback");
@@ -334,9 +355,9 @@ public class GlobalStateManagerImplTest {
 
     @Test
     public void shouldUnlockGlobalStateDirectoryOnClose() throws IOException {
-        stateManager.initialize(context);
+        stateManager.initialize();
         stateManager.close(Collections.<TopicPartition, Long>emptyMap());
-        final StateDirectory stateDir = new StateDirectory(config, new 
MockTime());
+        final StateDirectory stateDir = new StateDirectory(streamsConfig, new 
MockTime());
         try {
             // should be able to get the lock now as it should've been 
released in close
             assertTrue(stateDir.lockGlobalState());
@@ -347,7 +368,7 @@ public class GlobalStateManagerImplTest {
 
     @Test
     public void shouldNotCloseStoresIfCloseAlreadyCalled() throws IOException {
-        stateManager.initialize(context);
+        stateManager.initialize();
         initializeConsumer(1, 1, t1);
         stateManager.register(new NoOpReadOnlyStore("t1-store") {
             @Override
@@ -366,7 +387,7 @@ public class GlobalStateManagerImplTest {
 
     @Test
     public void shouldAttemptToCloseAllStoresEvenWhenSomeException() throws 
IOException {
-        stateManager.initialize(context);
+        stateManager.initialize();
         initializeConsumer(1, 1, t1);
         initializeConsumer(1, 1, t2);
         final NoOpReadOnlyStore store = new NoOpReadOnlyStore("t1-store") {
@@ -393,11 +414,11 @@ public class GlobalStateManagerImplTest {
     public void shouldReleaseLockIfExceptionWhenLoadingCheckpoints() throws 
IOException {
         writeCorruptCheckpoint();
         try {
-            stateManager.initialize(context);
+            stateManager.initialize();
         } catch (StreamsException e) {
             // expected
         }
-        final StateDirectory stateDir = new StateDirectory(config, new 
MockTime());
+        final StateDirectory stateDir = new StateDirectory(streamsConfig, new 
MockTime());
         try {
             // should be able to get the lock now as it should've been released
             assertTrue(stateDir.lockGlobalState());
@@ -409,7 +430,7 @@ public class GlobalStateManagerImplTest {
     @Test
     public void shouldCheckpointOffsets() throws IOException {
         final Map<TopicPartition, Long> offsets = Collections.singletonMap(t1, 
25L);
-        stateManager.initialize(context);
+        stateManager.initialize();
 
         stateManager.checkpoint(offsets);
 
@@ -420,8 +441,7 @@ public class GlobalStateManagerImplTest {
 
     @Test
     public void shouldNotRemoveOffsetsOfUnUpdatedTablesDuringCheckpoint() {
-        stateManager.initialize(context);
-        final TheStateRestoreCallback stateRestoreCallback = new 
TheStateRestoreCallback();
+        stateManager.initialize();
         initializeConsumer(10, 1, t1);
         stateManager.register(store1, stateRestoreCallback);
         initializeConsumer(20, 1, t2);
@@ -450,8 +470,7 @@ public class GlobalStateManagerImplTest {
         final byte[] expectedValue = "value".getBytes();
         consumer.addRecord(new ConsumerRecord<>(t1.topic(), t1.partition(), 2, 
expectedKey, expectedValue));
 
-        stateManager.initialize(context);
-        final TheStateRestoreCallback stateRestoreCallback = new 
TheStateRestoreCallback();
+        stateManager.initialize();
         stateManager.register(store1, stateRestoreCallback);
         final KeyValue<byte[], byte[]> restoredKv = 
stateRestoreCallback.restored.get(0);
         assertThat(stateRestoreCallback.restored, 
equalTo(Collections.singletonList(KeyValue.pair(restoredKv.key, 
restoredKv.value))));
@@ -459,8 +478,7 @@ public class GlobalStateManagerImplTest {
 
     @Test
     public void shouldCheckpointRestoredOffsetsToFile() throws IOException {
-        stateManager.initialize(context);
-        final TheStateRestoreCallback stateRestoreCallback = new 
TheStateRestoreCallback();
+        stateManager.initialize();
         initializeConsumer(10, 1, t1);
         stateManager.register(store1, stateRestoreCallback);
         stateManager.close(Collections.<TopicPartition, Long>emptyMap());
@@ -478,15 +496,15 @@ public class GlobalStateManagerImplTest {
 
     @Test
     public void 
shouldThrowLockExceptionIfIOExceptionCaughtWhenTryingToLockStateDir() {
-        stateManager = new GlobalStateManagerImpl(new LogContext("mock"), 
topology, consumer, new StateDirectory(config, time) {
+        stateManager = new GlobalStateManagerImpl(new LogContext("mock"), 
topology, consumer, new StateDirectory(streamsConfig, time) {
             @Override
             public boolean lockGlobalState() throws IOException {
                 throw new IOException("KABOOM!");
             }
-        }, stateRestoreListener, config);
+        }, stateRestoreListener, streamsConfig);
 
         try {
-            stateManager.initialize(context);
+            stateManager.initialize();
             fail("Should have thrown LockException");
         } catch (final LockException e) {
             // pass
@@ -504,7 +522,7 @@ public class GlobalStateManagerImplTest {
                 throw new TimeoutException();
             }
         };
-        config = new StreamsConfig(new Properties() {
+        streamsConfig = new StreamsConfig(new Properties() {
             {
                 put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
                 put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
@@ -520,7 +538,7 @@ public class GlobalStateManagerImplTest {
                 consumer,
                 stateDirectory,
                 stateRestoreListener,
-                config);
+                streamsConfig);
         } catch (final StreamsException expected) {
             assertEquals(numberOfCalls.get(), retries);
         }
@@ -537,7 +555,7 @@ public class GlobalStateManagerImplTest {
                 throw new TimeoutException();
             }
         };
-        config = new StreamsConfig(new Properties() {
+        streamsConfig = new StreamsConfig(new Properties() {
             {
                 put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
                 put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
@@ -553,12 +571,74 @@ public class GlobalStateManagerImplTest {
                 consumer,
                 stateDirectory,
                 stateRestoreListener,
-                config);
+                streamsConfig);
         } catch (final StreamsException expected) {
             assertEquals(numberOfCalls.get(), retries);
         }
     }
 
+    @Test
+    public void shouldDeleteAndRecreateStoreDirectoryOnReinitialize() throws 
IOException {
+        final File storeDirectory1 = new 
File(stateDirectory.globalStateDir().getAbsolutePath()
+            + File.separator + "rocksdb"
+            + File.separator + storeName1);
+        final File storeDirectory2 = new 
File(stateDirectory.globalStateDir().getAbsolutePath()
+            + File.separator + "rocksdb"
+            + File.separator + storeName2);
+        final File storeDirectory3 = new 
File(stateDirectory.globalStateDir().getAbsolutePath()
+            + File.separator + storeName3);
+        final File storeDirectory4 = new 
File(stateDirectory.globalStateDir().getAbsolutePath()
+            + File.separator + storeName4);
+        final File testFile1 = new File(storeDirectory1.getAbsolutePath() + 
File.separator + "testFile");
+        final File testFile2 = new File(storeDirectory2.getAbsolutePath() + 
File.separator + "testFile");
+        final File testFile3 = new File(storeDirectory3.getAbsolutePath() + 
File.separator + "testFile");
+        final File testFile4 = new File(storeDirectory4.getAbsolutePath() + 
File.separator + "testFile");
+
+        consumer.updatePartitions(t1.topic(), Collections.singletonList(new 
PartitionInfo(t1.topic(), t1.partition(), null, null, null)));
+        consumer.updatePartitions(t2.topic(), Collections.singletonList(new 
PartitionInfo(t2.topic(), t2.partition(), null, null, null)));
+        consumer.updatePartitions(t3.topic(), Collections.singletonList(new 
PartitionInfo(t3.topic(), t3.partition(), null, null, null)));
+        consumer.updatePartitions(t4.topic(), Collections.singletonList(new 
PartitionInfo(t4.topic(), t4.partition(), null, null, null)));
+        consumer.updateBeginningOffsets(new HashMap<TopicPartition, Long>() {
+            {
+                put(t1, 0L);
+                put(t2, 0L);
+                put(t3, 0L);
+                put(t4, 0L);
+            }
+        });
+        consumer.updateEndOffsets(new HashMap<TopicPartition, Long>() {
+            {
+                put(t1, 0L);
+                put(t2, 0L);
+                put(t3, 0L);
+                put(t4, 0L);
+            }
+        });
+
+        stateManager.initialize();
+        stateManager.register(store1, stateRestoreCallback);
+        stateManager.register(store2, stateRestoreCallback);
+        stateManager.register(store3, stateRestoreCallback);
+        stateManager.register(store4, stateRestoreCallback);
+
+        testFile1.createNewFile();
+        assertTrue(testFile1.exists());
+        testFile2.createNewFile();
+        assertTrue(testFile2.exists());
+        testFile3.createNewFile();
+        assertTrue(testFile3.exists());
+        testFile4.createNewFile();
+        assertTrue(testFile4.exists());
+
+        // only delete and recreate store 1 and 3 -- 2 and 4 must be untouched
+        stateManager.reinitializeStateStoresForPartitions(Utils.mkList(t1, 
t3), mockProcessorContext);
+
+        assertFalse(testFile1.exists());
+        assertTrue(testFile2.exists());
+        assertFalse(testFile3.exists());
+        assertTrue(testFile4.exists());
+    }
+
     private void writeCorruptCheckpoint() throws IOException {
         final File checkpointFile = new File(stateManager.baseDir(), 
ProcessorStateManager.CHECKPOINT_FILE_NAME);
         try (final FileOutputStream stream = new 
FileOutputStream(checkpointFile)) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index 2bd2d42..c71f469 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -16,17 +16,26 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.internals.InternalNameProvider;
+import org.apache.kafka.streams.kstream.internals.KTableSource;
+import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer;
+import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.test.MockStateRestoreListener;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
@@ -36,7 +45,9 @@ import org.junit.Test;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Set;
 
+import static 
org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.DEAD;
 import static 
org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.RUNNING;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -47,20 +58,49 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class GlobalStreamThreadTest {
-    private final KStreamBuilder builder = new KStreamBuilder();
-    private final MockConsumer<byte[], byte[]> mockConsumer = new 
MockConsumer<>(OffsetResetStrategy.EARLIEST);
+    private final InternalTopologyBuilder builder = new 
InternalTopologyBuilder();
+    private final MockConsumer<byte[], byte[]> mockConsumer = new 
MockConsumer<>(OffsetResetStrategy.NONE);
     private final MockTime time = new MockTime();
     private final MockStateRestoreListener stateRestoreListener = new 
MockStateRestoreListener();
     private GlobalStreamThread globalStreamThread;
     private StreamsConfig config;
 
+    private final static String GLOBAL_STORE_TOPIC_NAME = "foo";
+    private final static String GLOBAL_STORE_NAME = "bar";
+    private final TopicPartition topicPartition = new 
TopicPartition(GLOBAL_STORE_TOPIC_NAME, 0);
+
+    @SuppressWarnings("unchecked")
     @Before
     public void before() {
-        builder.globalTable("foo", "bar");
+        final MaterializedInternal<Object, Object, KeyValueStore<Bytes, 
byte[]>> materialized = new MaterializedInternal<>(
+            Materialized.<Object, Object, KeyValueStore<Bytes, 
byte[]>>with(null, null),
+            new InternalNameProvider() {
+                @Override
+                public String newProcessorName(String prefix) {
+                    return "processorName";
+                }
+
+                @Override
+                public String newStoreName(String prefix) {
+                    return GLOBAL_STORE_NAME;
+                }
+            },
+            "store-");
+
+        builder.addGlobalStore(
+            (StoreBuilder) new 
KeyValueStoreMaterializer<>(materialized).materialize().withLoggingDisabled(),
+            "sourceName",
+            null,
+            null,
+            null,
+            GLOBAL_STORE_TOPIC_NAME,
+            "processorName",
+            new KTableSource<>(GLOBAL_STORE_NAME));
+
         final HashMap<String, Object> properties = new HashMap<>();
         properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "blah");
         properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "blah");
-        properties.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
+        properties.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getAbsolutePath());
         config = new StreamsConfig(properties);
         globalStreamThread = new 
GlobalStreamThread(builder.buildGlobalStateTopology(),
                                                     config,
@@ -115,7 +155,6 @@ public class GlobalStreamThreadTest {
         assertFalse(globalStreamThread.stillRunning());
     }
 
-
     @Test
     public void shouldBeRunningAfterSuccessfulStart() {
         initializeConsumer();
@@ -136,7 +175,7 @@ public class GlobalStreamThreadTest {
     public void shouldCloseStateStoresOnClose() throws InterruptedException {
         initializeConsumer();
         globalStreamThread.start();
-        final StateStore globalStore = builder.globalStateStores().get("bar");
+        final StateStore globalStore = 
builder.globalStateStores().get(GLOBAL_STORE_NAME);
         assertTrue(globalStore.isOpen());
         globalStreamThread.shutdown();
         globalStreamThread.join();
@@ -146,7 +185,6 @@ public class GlobalStreamThreadTest {
     @SuppressWarnings("unchecked")
     @Test
     public void shouldTransitionToDeadOnClose() throws InterruptedException {
-
         initializeConsumer();
         globalStreamThread.start();
         globalStreamThread.shutdown();
@@ -158,7 +196,6 @@ public class GlobalStreamThreadTest {
     @SuppressWarnings("unchecked")
     @Test
     public void shouldStayDeadAfterTwoCloses() throws InterruptedException {
-
         initializeConsumer();
         globalStreamThread.start();
         globalStreamThread.shutdown();
@@ -170,8 +207,7 @@ public class GlobalStreamThreadTest {
 
     @SuppressWarnings("unchecked")
     @Test
-    public void shouldTransitiontoRunningOnStart() throws InterruptedException 
{
-
+    public void shouldTransitionToRunningOnStart() throws InterruptedException 
{
         initializeConsumer();
         globalStreamThread.start();
         TestUtils.waitForCondition(new TestCondition() {
@@ -183,19 +219,52 @@ public class GlobalStreamThreadTest {
         globalStreamThread.shutdown();
     }
 
+    @Test
+    public void shouldDieOnInvalidOffsetException() throws Exception {
+        initializeConsumer();
+        globalStreamThread.start();
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return globalStreamThread.state() == RUNNING;
+            }
+        }, 10 * 1000, "Thread never started.");
 
+        mockConsumer.updateEndOffsets(Collections.singletonMap(topicPartition, 
1L));
+        mockConsumer.addRecord(new ConsumerRecord<>(GLOBAL_STORE_TOPIC_NAME, 
0, 0L, "K1".getBytes(), "V1".getBytes()));
+
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return mockConsumer.position(topicPartition) == 1L;
+            }
+        }, 10 * 1000, "Input record never consumed");
+
+        mockConsumer.setException(new InvalidOffsetException("Try Again!") {
+            @Override
+            public Set<TopicPartition> partitions() {
+                return Collections.singleton(topicPartition);
+            }
+        });
+        // feed first record for recovery
+        mockConsumer.addRecord(new ConsumerRecord<>(GLOBAL_STORE_TOPIC_NAME, 
0, 0L, "K1".getBytes(), "V1".getBytes()));
+
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return globalStreamThread.state() == DEAD;
+            }
+        }, 10 * 1000, "GlobalStreamThread should have died.");
+    }
 
     private void initializeConsumer() {
-        mockConsumer.updatePartitions("foo", Collections.singletonList(new 
PartitionInfo("foo",
-                                                                               
          0,
-                                                                               
          null,
-                                                                               
          new Node[0],
-                                                                               
          new Node[0])));
-        final TopicPartition topicPartition = new TopicPartition("foo", 0);
+        mockConsumer.updatePartitions(GLOBAL_STORE_TOPIC_NAME, 
Collections.singletonList(new PartitionInfo(GLOBAL_STORE_TOPIC_NAME,
+            0,
+            null,
+            new Node[0],
+            new Node[0])));
         
mockConsumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, 
0L));
         mockConsumer.updateEndOffsets(Collections.singletonMap(topicPartition, 
0L));
+        mockConsumer.assign(Collections.singleton(topicPartition));
     }
-
-
-
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java
index 2bb5b7b..725211d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java
@@ -135,7 +135,7 @@ public class StateConsumerTest {
         private boolean flushed;
         private boolean closed;
 
-        public StateMaintainerStub(final Map<TopicPartition, Long> 
partitionOffsets) {
+        StateMaintainerStub(final Map<TopicPartition, Long> partitionOffsets) {
             this.partitionOffsets = partitionOffsets;
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java
index 47a0015..b14731d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.processor.StateStore;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Map;
 
 public class StateManagerStub implements StateManager {
@@ -33,7 +34,12 @@ public class StateManagerStub implements StateManager {
     }
 
     @Override
-    public void register(final StateStore store, final StateRestoreCallback 
stateRestoreCallback) {}
+    public void register(final StateStore store,
+                         final StateRestoreCallback stateRestoreCallback) {}
+
+    @Override
+    public void reinitializeStateStoresForPartitions(final 
Collection<TopicPartition> partitions,
+                                                     final 
InternalProcessorContext processorContext) {}
 
     @Override
     public void flush() {}

http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 342354c..9f6f712 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.PartitionInfo;
@@ -43,6 +44,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_BATCH;
@@ -52,6 +54,7 @@ import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.replay;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsEqual.equalTo;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -124,6 +127,30 @@ public class StoreChangelogReaderTest {
     }
 
     @Test
+    public void shouldRecoverFromInvalidOffsetExceptionAndFinishRestore() {
+        final int messages = 10;
+        setupConsumer(messages, topicPartition);
+        consumer.setException(new InvalidOffsetException("Try Again!") {
+            @Override
+            public Set<TopicPartition> partitions() {
+                return Collections.singleton(topicPartition);
+            }
+        });
+        changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, Long.MAX_VALUE, true,
+            "storeName"));
+
+        
EasyMock.expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        EasyMock.replay(active);
+
+        // first restore call "fails" but we should not die with an exception
+        assertEquals(0, changelogReader.restore(active).size());
+        // retry restore should succeed
+        assertEquals(1, changelogReader.restore(active).size());
+        assertThat(callback.restored.size(), equalTo(messages));
+    }
+
+
+    @Test
     public void shouldRestoreMessagesFromCheckpoint() {
         final int messages = 10;
         setupConsumer(messages, topicPartition);

http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index c852ae3..4aee8f5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -35,9 +35,6 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateRestoreListener;
@@ -49,7 +46,6 @@ import org.apache.kafka.test.MockSourceNode;
 import org.apache.kafka.test.MockStateRestoreListener;
 import org.apache.kafka.test.MockStateStore;
 import org.apache.kafka.test.MockTimestampExtractor;
-import org.apache.kafka.test.NoOpProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -465,12 +461,12 @@ public class StreamTaskTest {
             task.process();
             fail("Should've thrown StreamsException");
         } catch (final Exception e) {
-            assertThat(((ProcessorContextImpl) 
task.processorContext()).currentNode(), nullValue());
+            assertThat(task.processorContext.currentNode(), nullValue());
         }
     }
 
     @Test
-    public void 
shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuating() {
+    public void 
shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingStreamTime()
 {
         task = createStatelessTask(false);
         task.initialize();
 
@@ -483,32 +479,19 @@ public class StreamTaskTest {
             });
             fail("Should've thrown StreamsException");
         } catch (final StreamsException e) {
-            assertThat(((ProcessorContextImpl) 
task.processorContext()).currentNode(), nullValue());
+            final String message = e.getMessage();
+            assertTrue("message=" + message + " should contain processor", 
message.contains("processor '" + processorStreamTime.name() + "'"));
+            assertThat(task.processorContext.currentNode(), nullValue());
         }
     }
 
     @Test
-    public void 
shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingStreamTime()
 {
-        final Processor<Object, Object> processor = new 
AbstractProcessor<Object, Object>() {
-            @Override
-            public void init(final ProcessorContext context) {
-            }
-
-            @Override
-            public void process(final Object key, final Object value) {}
-
-            @Override
-            public void punctuate(final long timestamp) {}
-        };
-
-        final ProcessorNode<Object, Object> punctuator = new 
ProcessorNode<>("test", processor, Collections.<String>emptySet());
-        punctuator.init(new NoOpProcessorContext());
-
+    public void 
shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingWallClockTimeTime()
 {
         task = createStatelessTask(false);
         task.initialize();
 
         try {
-            task.punctuate(punctuator, 1, PunctuationType.STREAM_TIME, new 
Punctuator() {
+            task.punctuate(processorSystemTime, 1, 
PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
                 @Override
                 public void punctuate(long timestamp) {
                     throw new KafkaException("KABOOM!");
@@ -516,7 +499,9 @@ public class StreamTaskTest {
             });
             fail("Should've thrown StreamsException");
         } catch (final StreamsException e) {
-            assertThat(((ProcessorContextImpl) 
task.processorContext()).currentNode(), nullValue());
+            final String message = e.getMessage();
+            assertTrue("message=" + message + " should contain processor", 
message.contains("processor '" + processorSystemTime.name() + "'"));
+            assertThat(task.processorContext.currentNode(), nullValue());
         }
     }
 
@@ -567,7 +552,7 @@ public class StreamTaskTest {
     public void 
shouldThrowIllegalStateExceptionIfCurrentNodeIsNotNullWhenPunctuateCalled() {
         task = createStatelessTask(false);
         task.initialize();
-        ((ProcessorContextImpl) 
task.processorContext()).setCurrentNode(processorStreamTime);
+        task.processorContext.setCurrentNode(processorStreamTime);
         try {
             task.punctuate(processorStreamTime, 10, 
PunctuationType.STREAM_TIME, punctuator);
             fail("Should throw illegal state exception as current node is not 
null");
@@ -591,7 +576,7 @@ public class StreamTaskTest {
         task = createStatelessTask(false);
         task.initialize();
         task.punctuate(processorStreamTime, 5, PunctuationType.STREAM_TIME, 
punctuator);
-        assertThat(((ProcessorContextImpl) 
task.processorContext()).currentNode(), nullValue());
+        assertThat(((ProcessorContextImpl) task.context()).currentNode(), 
nullValue());
     }
 
     @Test(expected = IllegalStateException.class)
@@ -608,7 +593,7 @@ public class StreamTaskTest {
     @Test
     public void shouldNotThrowExceptionOnScheduleIfCurrentNodeIsNotNull() {
         task = createStatelessTask(false);
-        ((ProcessorContextImpl) 
task.processorContext()).setCurrentNode(processorStreamTime);
+        task.processorContext.setCurrentNode(processorStreamTime);
         task.schedule(1, PunctuationType.STREAM_TIME, new Punctuator() {
             @Override
             public void punctuate(long timestamp) {
@@ -744,7 +729,7 @@ public class StreamTaskTest {
     }
 
     @Test
-    public void shouldNotAbortTransactionOnZombieClosedIfEosEnabled() throws 
Exception {
+    public void shouldNotAbortTransactionOnZombieClosedIfEosEnabled() {
         task = createStatelessTask(true);
         task.close(false, true);
         task = null;

http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 69ae07c..8bcd6fb 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.Producer;
@@ -26,17 +27,20 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.TaskMigratedException;
+import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
 import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
 import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TaskMetadata;
 import org.apache.kafka.streams.processor.ThreadMetadata;
+import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockStateRestoreListener;
 import org.apache.kafka.test.MockTimestampExtractor;
@@ -631,7 +635,8 @@ public class StreamThreadTest {
 
     @Test
     public void shouldReturnStandbyTaskMetadataWhileRunningState() throws 
InterruptedException {
-        internalStreamsBuilder.stream(Collections.singleton(topic1), 
consumed).groupByKey().count("count-one");
+        internalStreamsBuilder.stream(Collections.singleton(topic1), consumed)
+            .groupByKey().count(Materialized.<Object, Long, 
KeyValueStore<Bytes, byte[]>>as("count-one"));
 
         final StreamThread thread = createStreamThread(clientId, config, 
false);
         final MockConsumer<byte[], byte[]> restoreConsumer = 
clientSupplier.restoreConsumer;
@@ -681,7 +686,8 @@ public class StreamThreadTest {
 
     @Test
     public void 
shouldAlwaysReturnEmptyTasksMetadataWhileRebalancingStateAndTasksNotRunning() 
throws InterruptedException {
-        internalStreamsBuilder.stream(Collections.singleton(topic1), 
consumed).groupByKey().count("count-one");
+        internalStreamsBuilder.stream(Collections.singleton(topic1), consumed)
+            .groupByKey().count(Materialized.<Object, Long, 
KeyValueStore<Bytes, byte[]>>as("count-one"));
 
         final StreamThread thread = createStreamThread(clientId, config, 
false);
         final MockConsumer<byte[], byte[]> restoreConsumer = 
clientSupplier.restoreConsumer;
@@ -728,6 +734,97 @@ public class StreamThreadTest {
         assertThreadMetadataHasEmptyTasksWithState(thread.threadMetadata(), 
StreamThread.State.PARTITIONS_ASSIGNED);
     }
 
+    @Test
+    public void 
shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore() throws 
Exception {
+        internalStreamsBuilder.stream(Collections.singleton("topic"), consumed)
+            .groupByKey().count(Materialized.<Object, Long, 
KeyValueStore<Bytes, byte[]>>as("count"));
+
+        final StreamThread thread = createStreamThread("cliendId", config, 
false);
+        final MockConsumer<byte[], byte[]> mockConsumer = 
(MockConsumer<byte[], byte[]>) thread.consumer;
+        final MockConsumer<byte[], byte[]> mockRestoreConsumer = 
(MockConsumer<byte[], byte[]>) thread.restoreConsumer;
+
+        final TopicPartition topicPartition = new TopicPartition("topic", 0);
+        final Set<TopicPartition> topicPartitionSet = 
Collections.singleton(topicPartition);
+
+        final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+        activeTasks.put(new TaskId(0, 0), topicPartitionSet);
+        thread.taskManager().setAssignmentMetadata(activeTasks, 
Collections.<TaskId, Set<TopicPartition>>emptyMap());
+
+        mockConsumer.updatePartitions("topic", new ArrayList<PartitionInfo>() {
+            {
+                add(new PartitionInfo("topic",
+                    0,
+                    null,
+                    new Node[0],
+                    new Node[0]));
+            }
+        });
+        
mockConsumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, 
0L));
+
+        
mockRestoreConsumer.updatePartitions("stream-thread-test-count-changelog", new 
ArrayList<PartitionInfo>() {
+            {
+                add(new PartitionInfo("stream-thread-test-count-changelog",
+                    0,
+                    null,
+                    new Node[0],
+                    new Node[0]));
+            }
+        });
+        final TopicPartition changelogPartition = new 
TopicPartition("stream-thread-test-count-changelog", 0);
+        final Set<TopicPartition> changelogPartitionSet = 
Collections.singleton(changelogPartition);
+        
mockRestoreConsumer.updateBeginningOffsets(Collections.singletonMap(changelogPartition,
 0L));
+        
mockRestoreConsumer.updateEndOffsets(Collections.singletonMap(changelogPartition,
 2L));
+
+        mockConsumer.schedulePollTask(new Runnable() {
+            @Override
+            public void run() {
+                thread.setState(StreamThread.State.PARTITIONS_REVOKED);
+                
thread.rebalanceListener.onPartitionsAssigned(topicPartitionSet);
+            }
+        });
+
+        try {
+            thread.start();
+
+            TestUtils.waitForCondition(new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    return mockRestoreConsumer.assignment().size() == 1;
+                }
+            }, "Never restore first record");
+
+            mockRestoreConsumer.addRecord(new 
ConsumerRecord<>("stream-thread-test-count-changelog", 0, 0L, "K1".getBytes(), 
"V1".getBytes()));
+
+            TestUtils.waitForCondition(new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    return mockRestoreConsumer.position(changelogPartition) == 
1L;
+                }
+            }, "Never restore first record");
+
+            mockRestoreConsumer.setException(new InvalidOffsetException("Try 
Again!") {
+                @Override
+                public Set<TopicPartition> partitions() {
+                    return changelogPartitionSet;
+                }
+            });
+
+            mockRestoreConsumer.addRecord(new 
ConsumerRecord<>("stream-thread-test-count-changelog", 0, 0L, "K1".getBytes(), 
"V1".getBytes()));
+            mockRestoreConsumer.addRecord(new 
ConsumerRecord<>("stream-thread-test-count-changelog", 0, 1L, "K2".getBytes(), 
"V2".getBytes()));
+
+            TestUtils.waitForCondition(new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    mockRestoreConsumer.assign(changelogPartitionSet);
+                    return mockRestoreConsumer.position(changelogPartition) == 
2L;
+                }
+            }, "Never finished restore");
+        } finally {
+            thread.shutdown();
+            thread.join(10000);
+        }
+    }
+
     private void assertThreadMetadataHasEmptyTasksWithState(ThreadMetadata 
metadata, StreamThread.State state) {
         assertEquals(state.name(), metadata.threadState());
         assertTrue(metadata.activeTasks().isEmpty());

http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java 
b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
index 6677084..ae0b923 100644
--- a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
+++ b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
@@ -24,6 +24,7 @@ import 
org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 
@@ -40,20 +41,26 @@ public class GlobalStateManagerStub implements 
GlobalStateManager {
     }
 
     @Override
-    public Set<String> initialize(final InternalProcessorContext 
processorContext) {
+    public void setGlobalProcessorContext(InternalProcessorContext 
processorContext) {}
+
+    @Override
+    public Set<String> initialize() {
         initialized = true;
         return storeNames;
     }
-    
+
+    @Override
+    public void reinitializeStateStoresForPartitions(final 
Collection<TopicPartition> partitions,
+                                                     final 
InternalProcessorContext processorContext) {}
+
     @Override
     public File baseDir() {
         return null;
     }
 
     @Override
-    public void register(final StateStore store, final StateRestoreCallback 
stateRestoreCallback) {
-
-    }
+    public void register(final StateStore store,
+                         final StateRestoreCallback stateRestoreCallback) {}
 
     @Override
     public void flush() {}

http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java 
b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
index 0ada2e4..ae46b8d 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
@@ -21,21 +21,30 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 
+import java.io.File;
+
 public class NoOpReadOnlyStore<K, V>
         implements ReadOnlyKeyValueStore<K, V>, StateStore {
 
     private final String name;
+    private final boolean rocksdbStore;
     private boolean open = true;
     public boolean initialized;
     public boolean flushed;
 
 
     public NoOpReadOnlyStore() {
-        this("");
+        this("", false);
     }
 
     public NoOpReadOnlyStore(final String name) {
+        this(name, false);
+    }
+
+    public NoOpReadOnlyStore(final String name,
+                             final boolean rocksdbStore) {
         this.name = name;
+        this.rocksdbStore = rocksdbStore;
     }
 
     @Override
@@ -65,6 +74,12 @@ public class NoOpReadOnlyStore<K, V>
 
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
+        if (rocksdbStore) {
+            // cf. RocksDBStore
+            new File(context.stateDir() + File.separator + "rocksdb" + 
File.separator + name).mkdirs();
+        } else {
+            new File(context.stateDir() + File.separator + name).mkdir();
+        }
         this.initialized = true;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/04395175/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java 
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 887b10d..ae0cc9c 100644
--- 
a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ 
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -37,7 +37,6 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
-import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl;
@@ -155,11 +154,9 @@ public class ProcessorTopologyTestDriver {
     private final Map<String, Queue<ProducerRecord<byte[], byte[]>>> 
outputRecordsByTopic = new HashMap<>();
     private final Set<String> internalTopics = new HashSet<>();
     private final Map<String, TopicPartition> globalPartitionsByTopic = new 
HashMap<>();
-    private final StateRestoreListener stateRestoreListener = new 
MockStateRestoreListener();
     private StreamTask task;
     private GlobalStateUpdateTask globalStateTask;
 
-
     /**
      * Create a new test diver instance
      * @param config the stream configuration for the topology
@@ -227,8 +224,10 @@ public class ProcessorTopologyTestDriver {
                                                                                
    stateDirectory,
                                                                                
    stateRestoreListener,
                                                                                
    config);
+            final GlobalProcessorContextImpl globalProcessorContext = new 
GlobalProcessorContextImpl(config, stateManager, streamsMetrics, cache);
+            stateManager.setGlobalProcessorContext(globalProcessorContext);
             globalStateTask = new GlobalStateUpdateTask(globalTopology,
-                                                        new 
GlobalProcessorContextImpl(config, stateManager, streamsMetrics, cache),
+                                                        globalProcessorContext,
                                                         stateManager,
                                                         new 
LogAndContinueExceptionHandler(),
                                                         new LogContext());
@@ -242,8 +241,8 @@ public class ProcessorTopologyTestDriver {
                                   consumer,
                                   new StoreChangelogReader(
                                       
createRestoreConsumer(topology.storeToChangelogTopic()),
-                                      stateRestoreListener,
-                                          new LogContext("topology-test-driver 
")),
+                                      new MockStateRestoreListener(),
+                                      new LogContext("topology-test-driver ")),
                                   config,
                                   streamsMetrics, stateDirectory,
                                   cache,

Reply via email to