This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new fa585d2  KAFKA-10306: GlobalThread should fail on 
InvalidOffsetException (#9075)
fa585d2 is described below

commit fa585d2773cb1e0f53194141e2e142fdbd9d3bd4
Author: Matthias J. Sax <matth...@confluent.io>
AuthorDate: Sun Jul 26 11:58:40 2020 -0700

    KAFKA-10306: GlobalThread should fail on InvalidOffsetException (#9075)
    
    * KAFKA-10306: GlobalThread should fail on InvalidOffsetException
    
    * Update 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
    
    Co-authored-by: John Roesler <vvcep...@users.noreply.github.com>
    
    * Update 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
    
    Co-authored-by: John Roesler <vvcep...@users.noreply.github.com>
    
    * Update 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
    
    Co-authored-by: John Roesler <vvcep...@users.noreply.github.com>
    
    * Update 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
    
    Co-authored-by: John Roesler <vvcep...@users.noreply.github.com>
---
 .../processor/internals/GlobalStateMaintainer.java |  2 +-
 .../internals/GlobalStateManagerImpl.java          | 29 +++-------
 .../processor/internals/GlobalStateUpdateTask.java | 24 ++++++--
 .../processor/internals/GlobalStreamThread.java    | 64 +++++++++++++++-------
 .../internals/GlobalStateManagerImplTest.java      | 16 ------
 .../processor/internals/GlobalStateTaskTest.java   | 28 ++++++++--
 .../internals/GlobalStreamThreadTest.java          | 46 +++++++++++-----
 .../processor/internals/StateConsumerTest.java     | 13 ++++-
 .../apache/kafka/test/GlobalStateManagerStub.java  | 10 +++-
 .../apache/kafka/streams/TopologyTestDriver.java   |  6 +-
 10 files changed, 147 insertions(+), 91 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateMaintainer.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateMaintainer.java
index acb32f7..9a8aab6 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateMaintainer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateMaintainer.java
@@ -31,7 +31,7 @@ interface GlobalStateMaintainer {
 
     void flushState();
 
-    void close() throws IOException;
+    void close(final boolean wipeStateStore) throws IOException;
 
     void update(ConsumerRecord<byte[], byte[]> record);
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 1def55c..6f4a2c3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
@@ -291,27 +290,17 @@ public class GlobalStateManagerImpl implements 
GlobalStateManager {
             long restoreCount = 0L;
 
             while (offset < highWatermark) {
-                try {
-                    final ConsumerRecords<byte[], byte[]> records = 
globalConsumer.poll(pollTime);
-                    final List<ConsumerRecord<byte[], byte[]>> restoreRecords 
= new ArrayList<>();
-                    for (final ConsumerRecord<byte[], byte[]> record : 
records.records(topicPartition)) {
-                        if (record.key() != null) {
-                            
restoreRecords.add(recordConverter.convert(record));
-                        }
+                final ConsumerRecords<byte[], byte[]> records = 
globalConsumer.poll(pollTime);
+                final List<ConsumerRecord<byte[], byte[]>> restoreRecords = 
new ArrayList<>();
+                for (final ConsumerRecord<byte[], byte[]> record : 
records.records(topicPartition)) {
+                    if (record.key() != null) {
+                        restoreRecords.add(recordConverter.convert(record));
                     }
-                    offset = globalConsumer.position(topicPartition);
-                    stateRestoreAdapter.restoreBatch(restoreRecords);
-                    stateRestoreListener.onBatchRestored(topicPartition, 
storeName, offset, restoreRecords.size());
-                    restoreCount += restoreRecords.size();
-                } catch (final InvalidOffsetException recoverableException) {
-                    log.warn("Restoring GlobalStore {} failed due to: {}. 
Deleting global store to recreate from scratch.",
-                        storeName,
-                        recoverableException.toString());
-
-                    // TODO K9113: we remove the re-init logic and push it to 
be handled by the thread directly
-
-                    restoreCount = 0L;
                 }
+                offset = globalConsumer.position(topicPartition);
+                stateRestoreAdapter.restoreBatch(restoreRecords);
+                stateRestoreListener.onBatchRestored(topicPartition, 
storeName, offset, restoreRecords.size());
+                restoreCount += restoreRecords.size();
             }
             stateRestoreListener.onRestoreEnd(topicPartition, storeName, 
restoreCount);
             checkpointFileCache.put(topicPartition, offset);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
index ddef7a4..b557330 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
@@ -19,8 +19,10 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.slf4j.Logger;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -33,6 +35,8 @@ import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.d
  * Updates the state for all Global State Stores.
  */
 public class GlobalStateUpdateTask implements GlobalStateMaintainer {
+    private final Logger log;
+    private final LogContext logContext;
 
     private final ProcessorTopology topology;
     private final InternalProcessorContext processorContext;
@@ -40,18 +44,18 @@ public class GlobalStateUpdateTask implements 
GlobalStateMaintainer {
     private final Map<String, RecordDeserializer> deserializers = new 
HashMap<>();
     private final GlobalStateManager stateMgr;
     private final DeserializationExceptionHandler 
deserializationExceptionHandler;
-    private final LogContext logContext;
 
-    public GlobalStateUpdateTask(final ProcessorTopology topology,
+    public GlobalStateUpdateTask(final LogContext logContext,
+                                 final ProcessorTopology topology,
                                  final InternalProcessorContext 
processorContext,
                                  final GlobalStateManager stateMgr,
-                                 final DeserializationExceptionHandler 
deserializationExceptionHandler,
-                                 final LogContext logContext) {
+                                 final DeserializationExceptionHandler 
deserializationExceptionHandler) {
+        this.logContext = logContext;
+        this.log = logContext.logger(getClass());
         this.topology = topology;
         this.stateMgr = stateMgr;
         this.processorContext = processorContext;
         this.deserializationExceptionHandler = deserializationExceptionHandler;
-        this.logContext = logContext;
     }
 
     /**
@@ -114,8 +118,16 @@ public class GlobalStateUpdateTask implements 
GlobalStateMaintainer {
         stateMgr.checkpoint(offsets);
     }
 
-    public void close() throws IOException {
+    public void close(final boolean wipeStateStore) throws IOException {
         stateMgr.close();
+        if (wipeStateStore) {
+            try {
+                log.info("Deleting global task directory after detecting 
corruption.");
+                Utils.delete(stateMgr.baseDir());
+            } catch (final IOException e) {
+                log.error("Failed to delete global task directory after 
detecting corruption.", e);
+            }
+        }
     }
 
     private void initTopology() {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
index 923480f..14d1ef8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -234,24 +234,18 @@ public class GlobalStreamThread extends Thread {
         }
 
         void pollAndUpdate() {
-            try {
-                final ConsumerRecords<byte[], byte[]> received = 
globalConsumer.poll(pollTime);
-                for (final ConsumerRecord<byte[], byte[]> record : received) {
-                    stateMaintainer.update(record);
-                }
-                final long now = time.milliseconds();
-                if (now >= lastFlush + flushInterval) {
-                    stateMaintainer.flushState();
-                    lastFlush = now;
-                }
-            } catch (final InvalidOffsetException recoverableException) {
-                log.error("Updating global state failed. You can restart 
KafkaStreams to recover from this error.", recoverableException);
-                throw new StreamsException("Updating global state failed. " +
-                    "You can restart KafkaStreams to recover from this 
error.", recoverableException);
+            final ConsumerRecords<byte[], byte[]> received = 
globalConsumer.poll(pollTime);
+            for (final ConsumerRecord<byte[], byte[]> record : received) {
+                stateMaintainer.update(record);
+            }
+            final long now = time.milliseconds();
+            if (now >= lastFlush + flushInterval) {
+                stateMaintainer.flushState();
+                lastFlush = now;
             }
         }
 
-        public void close() throws IOException {
+        public void close(final boolean wipeStateStore) throws IOException {
             try {
                 globalConsumer.close();
             } catch (final RuntimeException e) {
@@ -260,7 +254,7 @@ public class GlobalStreamThread extends Thread {
                 log.error("Failed to close global consumer due to the 
following error:", e);
             }
 
-            stateMaintainer.close();
+            stateMaintainer.close(wipeStateStore);
         }
     }
 
@@ -284,10 +278,21 @@ public class GlobalStreamThread extends Thread {
         }
         setState(State.RUNNING);
 
+        boolean wipeStateStore = false;
         try {
             while (stillRunning()) {
                 stateConsumer.pollAndUpdate();
             }
+        } catch (final InvalidOffsetException recoverableException) {
+            wipeStateStore = true;
+            log.error(
+                "Updating global state failed due to inconsistent local state. 
Will attempt to clean up the local state. You can restart KafkaStreams to 
recover from this error.",
+                recoverableException
+            );
+            throw new StreamsException(
+                "Updating global state failed. You can restart KafkaStreams to 
recover from this error.",
+                recoverableException
+            );
         } finally {
             // set the state to pending shutdown first as it may be called due 
to error;
             // its state may already be PENDING_SHUTDOWN so it will return 
false but we
@@ -297,7 +302,7 @@ public class GlobalStreamThread extends Thread {
             log.info("Shutting down");
 
             try {
-                stateConsumer.close();
+                stateConsumer.close(wipeStateStore);
             } catch (final IOException e) {
                 log.error("Failed to close state maintainer due to the 
following error:", e);
             }
@@ -331,17 +336,36 @@ public class GlobalStreamThread extends Thread {
                 logContext,
                 globalConsumer,
                 new GlobalStateUpdateTask(
+                    logContext,
                     topology,
                     globalProcessorContext,
                     stateMgr,
-                    config.defaultDeserializationExceptionHandler(),
-                    logContext
+                    config.defaultDeserializationExceptionHandler()
                 ),
                 time,
                 
Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)),
                 config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG)
             );
-            stateConsumer.initialize();
+
+            try {
+                stateConsumer.initialize();
+            } catch (final InvalidOffsetException recoverableException) {
+                log.error(
+                    "Bootstrapping global state failed due to inconsistent 
local state. Will attempt to clean up the local state. You can restart 
KafkaStreams to recover from this error.",
+                    recoverableException
+                );
+
+                try {
+                    stateConsumer.close(true);
+                } catch (final IOException e) {
+                    log.error("Failed to close state consumer due to the 
following error:", e);
+                }
+
+                throw new StreamsException(
+                    "Bootstrapping global state failed. You can restart 
KafkaStreams to recover from this error.",
+                    recoverableException
+                );
+            }
 
             return stateConsumer;
         } catch (final LockException fatalException) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index 4087141..463e47c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.PartitionInfo;
@@ -324,21 +323,6 @@ public class GlobalStateManagerImplTest {
     }
 
     @Test
-    public void shouldRecoverFromInvalidOffsetExceptionAndRestoreRecords() {
-        initializeConsumer(2, 0, t1);
-        consumer.setPollException(new InvalidOffsetException("Try Again!") {
-            public Set<TopicPartition> partitions() {
-                return Collections.singleton(t1);
-            }
-        });
-
-        stateManager.initialize();
-
-        stateManager.registerStore(store1, stateRestoreCallback);
-        assertEquals(2, stateRestoreCallback.restored.size());
-    }
-
-    @Test
     public void shouldListenForRestoreEvents() {
         initializeConsumer(5, 1, t1);
         stateManager.initialize();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
index 2319199..42d3fc9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
@@ -32,9 +32,11 @@ import org.apache.kafka.test.GlobalStateManagerStub;
 import org.apache.kafka.test.MockProcessorNode;
 import org.apache.kafka.test.MockSourceNode;
 import org.apache.kafka.test.NoOpProcessorContext;
+import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.File;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -44,6 +46,7 @@ import static java.util.Arrays.asList;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -65,6 +68,7 @@ public class GlobalStateTaskTest {
     private final MockProcessorNode<?, ?> processorTwo = new 
MockProcessorNode<>();
 
     private final Map<TopicPartition, Long> offsets = new HashMap<>();
+    private File testDirectory = TestUtils.tempDirectory("global-store");
     private final NoOpProcessorContext context = new NoOpProcessorContext();
 
     private ProcessorTopology topology;
@@ -88,8 +92,14 @@ public class GlobalStateTaskTest {
 
         offsets.put(t1, 50L);
         offsets.put(t2, 100L);
-        stateMgr = new GlobalStateManagerStub(storeNames, offsets);
-        globalStateTask = new GlobalStateUpdateTask(topology, context, 
stateMgr, new LogAndFailExceptionHandler(), logContext);
+        stateMgr = new GlobalStateManagerStub(storeNames, offsets, 
testDirectory);
+        globalStateTask = new GlobalStateUpdateTask(
+            logContext,
+            topology,
+            context,
+            stateMgr,
+            new LogAndFailExceptionHandler()
+        );
     }
 
     @Test
@@ -171,11 +181,11 @@ public class GlobalStateTaskTest {
     @Test
     public void 
shouldNotThrowStreamsExceptionWhenKeyDeserializationFailsWithSkipHandler() {
         final GlobalStateUpdateTask globalStateTask2 = new 
GlobalStateUpdateTask(
+            logContext,
             topology,
             context,
             stateMgr,
-            new LogAndContinueExceptionHandler(),
-            logContext
+            new LogAndContinueExceptionHandler()
         );
         final byte[] key = new LongSerializer().serialize(topic2, 1L);
         final byte[] recordValue = new IntegerSerializer().serialize(topic2, 
10);
@@ -186,11 +196,11 @@ public class GlobalStateTaskTest {
     @Test
     public void shouldNotThrowStreamsExceptionWhenValueDeserializationFails() {
         final GlobalStateUpdateTask globalStateTask2 = new 
GlobalStateUpdateTask(
+            logContext,
             topology,
             context,
             stateMgr,
-            new LogAndContinueExceptionHandler(),
-            logContext
+            new LogAndContinueExceptionHandler()
         );
         final byte[] key = new IntegerSerializer().serialize(topic2, 1);
         final byte[] recordValue = new LongSerializer().serialize(topic2, 10L);
@@ -221,4 +231,10 @@ public class GlobalStateTaskTest {
         assertThat(stateMgr.changelogOffsets(), equalTo(expectedOffsets));
     }
 
+    @Test
+    public void shouldWipeGlobalStateDirectory() throws Exception {
+        assertTrue(stateMgr.baseDir().exists());
+        globalStateTask.close(true);
+        assertFalse(stateMgr.baseDir().exists());
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index ddb20c9..15bae37 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -31,8 +31,8 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.internals.InternalNameProvider;
 import org.apache.kafka.streams.kstream.internals.KTableSource;
-import 
org.apache.kafka.streams.kstream.internals.TimestampedKeyValueStoreMaterializer;
 import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
+import 
org.apache.kafka.streams.kstream.internals.TimestampedKeyValueStoreMaterializer;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -41,6 +41,7 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.File;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -63,12 +64,12 @@ public class GlobalStreamThreadTest {
     private final MockStateRestoreListener stateRestoreListener = new 
MockStateRestoreListener();
     private GlobalStreamThread globalStreamThread;
     private StreamsConfig config;
+    private String baseDirectoryName;
 
     private final static String GLOBAL_STORE_TOPIC_NAME = "foo";
     private final static String GLOBAL_STORE_NAME = "bar";
     private final TopicPartition topicPartition = new 
TopicPartition(GLOBAL_STORE_TOPIC_NAME, 0);
 
-    @SuppressWarnings("unchecked")
     @Before
     public void before() {
         final MaterializedInternal<Object, Object, KeyValueStore<Bytes, 
byte[]>> materialized =
@@ -97,10 +98,11 @@ public class GlobalStreamThreadTest {
             "processorName",
             new KTableSource<>(GLOBAL_STORE_NAME, GLOBAL_STORE_NAME));
 
+        baseDirectoryName = TestUtils.tempDirectory().getAbsolutePath();
         final HashMap<String, Object> properties = new HashMap<>();
         properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "blah");
-        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "blah");
-        properties.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getAbsolutePath());
+        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "testAppId");
+        properties.put(StreamsConfig.STATE_DIR_CONFIG, baseDirectoryName);
         config = new StreamsConfig(properties);
         globalStreamThread = new GlobalStreamThread(
             builder.rewriteTopology(config).buildGlobalStateTopology(),
@@ -128,10 +130,9 @@ public class GlobalStreamThreadTest {
         assertFalse(globalStreamThread.stillRunning());
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldThrowStreamsExceptionOnStartupIfExceptionOccurred() {
-        final MockConsumer<byte[], byte[]> mockConsumer = new 
MockConsumer(OffsetResetStrategy.EARLIEST) {
+        final MockConsumer<byte[], byte[]> mockConsumer = new 
MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
             @Override
             public List<PartitionInfo> partitionsFor(final String topic) {
                 throw new RuntimeException("KABOOM!");
@@ -186,7 +187,6 @@ public class GlobalStreamThreadTest {
         assertFalse(globalStore.isOpen());
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldTransitionToDeadOnClose() throws Exception {
         initializeConsumer();
@@ -197,7 +197,6 @@ public class GlobalStreamThreadTest {
         assertEquals(GlobalStreamThread.State.DEAD, 
globalStreamThread.state());
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldStayDeadAfterTwoCloses() throws Exception {
         initializeConsumer();
@@ -209,7 +208,6 @@ public class GlobalStreamThreadTest {
         assertEquals(GlobalStreamThread.State.DEAD, 
globalStreamThread.state());
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldTransitionToRunningOnStart() throws Exception {
         initializeConsumer();
@@ -224,7 +222,28 @@ public class GlobalStreamThreadTest {
     }
 
     @Test
-    public void shouldDieOnInvalidOffsetException() throws Exception {
+    public void shouldDieOnInvalidOffsetExceptionDuringStartup() throws 
Exception {
+        initializeConsumer();
+        mockConsumer.setPollException(new InvalidOffsetException("Try Again!") 
{
+            @Override
+            public Set<TopicPartition> partitions() {
+                return Collections.singleton(topicPartition);
+            }
+        });
+
+        globalStreamThread.start();
+
+        TestUtils.waitForCondition(
+            () -> globalStreamThread.state() == DEAD,
+            10 * 1000,
+            "GlobalStreamThread should have died."
+        );
+
+        assertFalse(new File(baseDirectoryName + File.separator + "testAppId" 
+ File.separator + "global").exists());
+    }
+
+    @Test
+    public void shouldDieOnInvalidOffsetExceptionWhileRunning() throws 
Exception {
         initializeConsumer();
         globalStreamThread.start();
 
@@ -247,13 +266,14 @@ public class GlobalStreamThreadTest {
                 return Collections.singleton(topicPartition);
             }
         });
-        // feed first record for recovery
-        mockConsumer.addRecord(new ConsumerRecord<>(GLOBAL_STORE_TOPIC_NAME, 
0, 0L, "K1".getBytes(), "V1".getBytes()));
 
         TestUtils.waitForCondition(
             () -> globalStreamThread.state() == DEAD,
             10 * 1000,
-            "GlobalStreamThread should have died.");
+            "GlobalStreamThread should have died."
+        );
+
+        assertFalse(new File(baseDirectoryName + File.separator + "testAppId" 
+ File.separator + "global").exists());
     }
 
     private void initializeConsumer() {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java
index 606b380..1f98eb4 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java
@@ -110,21 +110,27 @@ public class StateConsumerTest {
 
     @Test
     public void shouldCloseConsumer() throws IOException {
-        stateConsumer.close();
+        stateConsumer.close(false);
         assertTrue(consumer.closed());
     }
 
     @Test
     public void shouldCloseStateMaintainer() throws IOException {
-        stateConsumer.close();
+        stateConsumer.close(false);
         assertTrue(stateMaintainer.closed);
     }
 
+    @Test
+    public void shouldWipeStoreOnClose() throws IOException {
+        stateConsumer.close(true);
+        assertTrue(stateMaintainer.wipeStore);
+    }
 
     private static class TaskStub implements GlobalStateMaintainer {
         private final Map<TopicPartition, Long> partitionOffsets;
         private final Map<TopicPartition, Integer> updatedPartitions = new 
HashMap<>();
         private boolean flushed;
+        private boolean wipeStore;
         private boolean closed;
 
         TaskStub(final Map<TopicPartition, Long> partitionOffsets) {
@@ -141,8 +147,9 @@ public class StateConsumerTest {
         }
 
         @Override
-        public void close() {
+        public void close(final boolean wipeStateStore) {
             closed = true;
+            wipeStore = wipeStateStore;
         }
 
         @Override
diff --git 
a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java 
b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
index ae825bc..db8021b 100644
--- a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
+++ b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
@@ -21,22 +21,26 @@ import 
org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.GlobalStateManager;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.Task.TaskType;
 
 import java.io.File;
 import java.util.Map;
 import java.util.Set;
-import org.apache.kafka.streams.processor.internals.Task.TaskType;
 
 public class GlobalStateManagerStub implements GlobalStateManager {
 
     private final Set<String> storeNames;
     private final Map<TopicPartition, Long> offsets;
+    private final File baseDirectory;
     public boolean initialized;
     public boolean closed;
 
-    public GlobalStateManagerStub(final Set<String> storeNames, final 
Map<TopicPartition, Long> offsets) {
+    public GlobalStateManagerStub(final Set<String> storeNames,
+                                  final Map<TopicPartition, Long> offsets,
+                                  final File baseDirectory) {
         this.storeNames = storeNames;
         this.offsets = offsets;
+        this.baseDirectory = baseDirectory;
     }
 
     @Override
@@ -50,7 +54,7 @@ public class GlobalStateManagerStub implements 
GlobalStateManager {
 
     @Override
     public File baseDir() {
-        return null;
+        return baseDirectory;
     }
 
     @Override
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 31659bb..225fd74 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -436,11 +436,11 @@ public class TopologyTestDriver implements Closeable {
             
globalStateManager.setGlobalProcessorContext(globalProcessorContext);
 
             globalStateTask = new GlobalStateUpdateTask(
+                logContext,
                 globalTopology,
                 globalProcessorContext,
                 globalStateManager,
-                new LogAndContinueExceptionHandler(),
-                logContext
+                new LogAndContinueExceptionHandler()
             );
             globalStateTask.initialize();
             globalProcessorContext.setRecordContext(new ProcessorRecordContext(
@@ -1181,7 +1181,7 @@ public class TopologyTestDriver implements Closeable {
         }
         if (globalStateTask != null) {
             try {
-                globalStateTask.close();
+                globalStateTask.close(false);
             } catch (final IOException e) {
                 // ignore
             }

Reply via email to