This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 2.6 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push: new fa585d2 KAFKA-10306: GlobalThread should fail on InvalidOffsetException (#9075) fa585d2 is described below commit fa585d2773cb1e0f53194141e2e142fdbd9d3bd4 Author: Matthias J. Sax <matth...@confluent.io> AuthorDate: Sun Jul 26 11:58:40 2020 -0700 KAFKA-10306: GlobalThread should fail on InvalidOffsetException (#9075) * KAFKA-10306: GlobalThread should fail on InvalidOffsetException * Update streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java Co-authored-by: John Roesler <vvcep...@users.noreply.github.com> * Update streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java Co-authored-by: John Roesler <vvcep...@users.noreply.github.com> * Update streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java Co-authored-by: John Roesler <vvcep...@users.noreply.github.com> * Update streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java Co-authored-by: John Roesler <vvcep...@users.noreply.github.com> --- .../processor/internals/GlobalStateMaintainer.java | 2 +- .../internals/GlobalStateManagerImpl.java | 29 +++------- .../processor/internals/GlobalStateUpdateTask.java | 24 ++++++-- .../processor/internals/GlobalStreamThread.java | 64 +++++++++++++++------- .../internals/GlobalStateManagerImplTest.java | 16 ------ .../processor/internals/GlobalStateTaskTest.java | 28 ++++++++-- .../internals/GlobalStreamThreadTest.java | 46 +++++++++++----- .../processor/internals/StateConsumerTest.java | 13 ++++- .../apache/kafka/test/GlobalStateManagerStub.java | 10 +++- .../apache/kafka/streams/TopologyTestDriver.java | 6 +- 10 files changed, 147 insertions(+), 91 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateMaintainer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateMaintainer.java index acb32f7..9a8aab6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateMaintainer.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateMaintainer.java @@ -31,7 +31,7 @@ interface GlobalStateMaintainer { void flushState(); - void close() throws IOException; + void close(final boolean wipeStateStore) throws IOException; void update(ConsumerRecord<byte[], byte[]> record); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index 1def55c..6f4a2c3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -19,7 +19,6 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; @@ -291,27 +290,17 @@ public class GlobalStateManagerImpl implements GlobalStateManager { long restoreCount = 0L; while (offset < highWatermark) { - try { - final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(pollTime); - final List<ConsumerRecord<byte[], byte[]>> restoreRecords = new ArrayList<>(); - for (final ConsumerRecord<byte[], byte[]> record : records.records(topicPartition)) { - if (record.key() != null) { - restoreRecords.add(recordConverter.convert(record)); - } + final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(pollTime); + final List<ConsumerRecord<byte[], byte[]>> restoreRecords = new ArrayList<>(); + for (final ConsumerRecord<byte[], byte[]> record : records.records(topicPartition)) { + if (record.key() != null) { + restoreRecords.add(recordConverter.convert(record)); } - offset = globalConsumer.position(topicPartition); - stateRestoreAdapter.restoreBatch(restoreRecords); - stateRestoreListener.onBatchRestored(topicPartition, storeName, offset, restoreRecords.size()); - restoreCount += restoreRecords.size(); - } catch (final InvalidOffsetException recoverableException) { - log.warn("Restoring GlobalStore {} failed due to: {}. Deleting global store to recreate from scratch.", - storeName, - recoverableException.toString()); - - // TODO K9113: we remove the re-init logic and push it to be handled by the thread directly - - restoreCount = 0L; } + offset = globalConsumer.position(topicPartition); + stateRestoreAdapter.restoreBatch(restoreRecords); + stateRestoreListener.onBatchRestored(topicPartition, storeName, offset, restoreRecords.size()); + restoreCount += restoreRecords.size(); } stateRestoreListener.onRestoreEnd(topicPartition, storeName, restoreCount); checkpointFileCache.put(topicPartition, offset); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java index ddef7a4..b557330 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java @@ -19,8 +19,10 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.errors.DeserializationExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; +import org.slf4j.Logger; import java.io.IOException; import java.util.HashMap; @@ -33,6 +35,8 @@ import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.d * Updates the state for all Global State Stores. */ public class GlobalStateUpdateTask implements GlobalStateMaintainer { + private final Logger log; + private final LogContext logContext; private final ProcessorTopology topology; private final InternalProcessorContext processorContext; @@ -40,18 +44,18 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer { private final Map<String, RecordDeserializer> deserializers = new HashMap<>(); private final GlobalStateManager stateMgr; private final DeserializationExceptionHandler deserializationExceptionHandler; - private final LogContext logContext; - public GlobalStateUpdateTask(final ProcessorTopology topology, + public GlobalStateUpdateTask(final LogContext logContext, + final ProcessorTopology topology, final InternalProcessorContext processorContext, final GlobalStateManager stateMgr, - final DeserializationExceptionHandler deserializationExceptionHandler, - final LogContext logContext) { + final DeserializationExceptionHandler deserializationExceptionHandler) { + this.logContext = logContext; + this.log = logContext.logger(getClass()); this.topology = topology; this.stateMgr = stateMgr; this.processorContext = processorContext; this.deserializationExceptionHandler = deserializationExceptionHandler; - this.logContext = logContext; } /** @@ -114,8 +118,16 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer { stateMgr.checkpoint(offsets); } - public void close() throws IOException { + public void close(final boolean wipeStateStore) throws IOException { stateMgr.close(); + if (wipeStateStore) { + try { + log.info("Deleting global task directory after detecting corruption."); + Utils.delete(stateMgr.baseDir()); + } catch (final IOException e) { + log.error("Failed to delete global task directory after detecting corruption.", e); + } + } } private void initTopology() { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java index 923480f..14d1ef8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java @@ -234,24 +234,18 @@ public class GlobalStreamThread extends Thread { } void pollAndUpdate() { - try { - final ConsumerRecords<byte[], byte[]> received = globalConsumer.poll(pollTime); - for (final ConsumerRecord<byte[], byte[]> record : received) { - stateMaintainer.update(record); - } - final long now = time.milliseconds(); - if (now >= lastFlush + flushInterval) { - stateMaintainer.flushState(); - lastFlush = now; - } - } catch (final InvalidOffsetException recoverableException) { - log.error("Updating global state failed. You can restart KafkaStreams to recover from this error.", recoverableException); - throw new StreamsException("Updating global state failed. " + - "You can restart KafkaStreams to recover from this error.", recoverableException); + final ConsumerRecords<byte[], byte[]> received = globalConsumer.poll(pollTime); + for (final ConsumerRecord<byte[], byte[]> record : received) { + stateMaintainer.update(record); + } + final long now = time.milliseconds(); + if (now >= lastFlush + flushInterval) { + stateMaintainer.flushState(); + lastFlush = now; } } - public void close() throws IOException { + public void close(final boolean wipeStateStore) throws IOException { try { globalConsumer.close(); } catch (final RuntimeException e) { @@ -260,7 +254,7 @@ public class GlobalStreamThread extends Thread { log.error("Failed to close global consumer due to the following error:", e); } - stateMaintainer.close(); + stateMaintainer.close(wipeStateStore); } } @@ -284,10 +278,21 @@ public class GlobalStreamThread extends Thread { } setState(State.RUNNING); + boolean wipeStateStore = false; try { while (stillRunning()) { stateConsumer.pollAndUpdate(); } + } catch (final InvalidOffsetException recoverableException) { + wipeStateStore = true; + log.error( + "Updating global state failed due to inconsistent local state. Will attempt to clean up the local state. You can restart KafkaStreams to recover from this error.", + recoverableException + ); + throw new StreamsException( + "Updating global state failed. You can restart KafkaStreams to recover from this error.", + recoverableException + ); } finally { // set the state to pending shutdown first as it may be called due to error; // its state may already be PENDING_SHUTDOWN so it will return false but we @@ -297,7 +302,7 @@ public class GlobalStreamThread extends Thread { log.info("Shutting down"); try { - stateConsumer.close(); + stateConsumer.close(wipeStateStore); } catch (final IOException e) { log.error("Failed to close state maintainer due to the following error:", e); } @@ -331,17 +336,36 @@ public class GlobalStreamThread extends Thread { logContext, globalConsumer, new GlobalStateUpdateTask( + logContext, topology, globalProcessorContext, stateMgr, - config.defaultDeserializationExceptionHandler(), - logContext + config.defaultDeserializationExceptionHandler() ), time, Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) ); - stateConsumer.initialize(); + + try { + stateConsumer.initialize(); + } catch (final InvalidOffsetException recoverableException) { + log.error( + "Bootstrapping global state failed due to inconsistent local state. Will attempt to clean up the local state. You can restart KafkaStreams to recover from this error.", + recoverableException + ); + + try { + stateConsumer.close(true); + } catch (final IOException e) { + log.error("Failed to close state consumer due to the following error:", e); + } + + throw new StreamsException( + "Bootstrapping global state failed. You can restart KafkaStreams to recover from this error.", + recoverableException + ); + } return stateConsumer; } catch (final LockException fatalException) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java index 4087141..463e47c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.PartitionInfo; @@ -324,21 +323,6 @@ public class GlobalStateManagerImplTest { } @Test - public void shouldRecoverFromInvalidOffsetExceptionAndRestoreRecords() { - initializeConsumer(2, 0, t1); - consumer.setPollException(new InvalidOffsetException("Try Again!") { - public Set<TopicPartition> partitions() { - return Collections.singleton(t1); - } - }); - - stateManager.initialize(); - - stateManager.registerStore(store1, stateRestoreCallback); - assertEquals(2, stateRestoreCallback.restored.size()); - } - - @Test public void shouldListenForRestoreEvents() { initializeConsumer(5, 1, t1); stateManager.initialize(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java index 2319199..42d3fc9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java @@ -32,9 +32,11 @@ import org.apache.kafka.test.GlobalStateManagerStub; import org.apache.kafka.test.MockProcessorNode; import org.apache.kafka.test.MockSourceNode; import org.apache.kafka.test.NoOpProcessorContext; +import org.apache.kafka.test.TestUtils; import org.junit.Before; import org.junit.Test; +import java.io.File; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -44,6 +46,7 @@ import static java.util.Arrays.asList; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -65,6 +68,7 @@ public class GlobalStateTaskTest { private final MockProcessorNode<?, ?> processorTwo = new MockProcessorNode<>(); private final Map<TopicPartition, Long> offsets = new HashMap<>(); + private File testDirectory = TestUtils.tempDirectory("global-store"); private final NoOpProcessorContext context = new NoOpProcessorContext(); private ProcessorTopology topology; @@ -88,8 +92,14 @@ public class GlobalStateTaskTest { offsets.put(t1, 50L); offsets.put(t2, 100L); - stateMgr = new GlobalStateManagerStub(storeNames, offsets); - globalStateTask = new GlobalStateUpdateTask(topology, context, stateMgr, new LogAndFailExceptionHandler(), logContext); + stateMgr = new GlobalStateManagerStub(storeNames, offsets, testDirectory); + globalStateTask = new GlobalStateUpdateTask( + logContext, + topology, + context, + stateMgr, + new LogAndFailExceptionHandler() + ); } @Test @@ -171,11 +181,11 @@ public class GlobalStateTaskTest { @Test public void shouldNotThrowStreamsExceptionWhenKeyDeserializationFailsWithSkipHandler() { final GlobalStateUpdateTask globalStateTask2 = new GlobalStateUpdateTask( + logContext, topology, context, stateMgr, - new LogAndContinueExceptionHandler(), - logContext + new LogAndContinueExceptionHandler() ); final byte[] key = new LongSerializer().serialize(topic2, 1L); final byte[] recordValue = new IntegerSerializer().serialize(topic2, 10); @@ -186,11 +196,11 @@ public class GlobalStateTaskTest { @Test public void shouldNotThrowStreamsExceptionWhenValueDeserializationFails() { final GlobalStateUpdateTask globalStateTask2 = new GlobalStateUpdateTask( + logContext, topology, context, stateMgr, - new LogAndContinueExceptionHandler(), - logContext + new LogAndContinueExceptionHandler() ); final byte[] key = new IntegerSerializer().serialize(topic2, 1); final byte[] recordValue = new LongSerializer().serialize(topic2, 10L); @@ -221,4 +231,10 @@ public class GlobalStateTaskTest { assertThat(stateMgr.changelogOffsets(), equalTo(expectedOffsets)); } + @Test + public void shouldWipeGlobalStateDirectory() throws Exception { + assertTrue(stateMgr.baseDir().exists()); + globalStateTask.close(true); + assertFalse(stateMgr.baseDir().exists()); + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java index ddb20c9..15bae37 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java @@ -31,8 +31,8 @@ import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.internals.InternalNameProvider; import org.apache.kafka.streams.kstream.internals.KTableSource; -import org.apache.kafka.streams.kstream.internals.TimestampedKeyValueStoreMaterializer; import org.apache.kafka.streams.kstream.internals.MaterializedInternal; +import org.apache.kafka.streams.kstream.internals.TimestampedKeyValueStoreMaterializer; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.KeyValueStore; @@ -41,6 +41,7 @@ import org.apache.kafka.test.TestUtils; import org.junit.Before; import org.junit.Test; +import java.io.File; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -63,12 +64,12 @@ public class GlobalStreamThreadTest { private final MockStateRestoreListener stateRestoreListener = new MockStateRestoreListener(); private GlobalStreamThread globalStreamThread; private StreamsConfig config; + private String baseDirectoryName; private final static String GLOBAL_STORE_TOPIC_NAME = "foo"; private final static String GLOBAL_STORE_NAME = "bar"; private final TopicPartition topicPartition = new TopicPartition(GLOBAL_STORE_TOPIC_NAME, 0); - @SuppressWarnings("unchecked") @Before public void before() { final MaterializedInternal<Object, Object, KeyValueStore<Bytes, byte[]>> materialized = @@ -97,10 +98,11 @@ public class GlobalStreamThreadTest { "processorName", new KTableSource<>(GLOBAL_STORE_NAME, GLOBAL_STORE_NAME)); + baseDirectoryName = TestUtils.tempDirectory().getAbsolutePath(); final HashMap<String, Object> properties = new HashMap<>(); properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "blah"); - properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "blah"); - properties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); + properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "testAppId"); + properties.put(StreamsConfig.STATE_DIR_CONFIG, baseDirectoryName); config = new StreamsConfig(properties); globalStreamThread = new GlobalStreamThread( builder.rewriteTopology(config).buildGlobalStateTopology(), @@ -128,10 +130,9 @@ public class GlobalStreamThreadTest { assertFalse(globalStreamThread.stillRunning()); } - @SuppressWarnings("unchecked") @Test public void shouldThrowStreamsExceptionOnStartupIfExceptionOccurred() { - final MockConsumer<byte[], byte[]> mockConsumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { + final MockConsumer<byte[], byte[]> mockConsumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) { @Override public List<PartitionInfo> partitionsFor(final String topic) { throw new RuntimeException("KABOOM!"); @@ -186,7 +187,6 @@ public class GlobalStreamThreadTest { assertFalse(globalStore.isOpen()); } - @SuppressWarnings("unchecked") @Test public void shouldTransitionToDeadOnClose() throws Exception { initializeConsumer(); @@ -197,7 +197,6 @@ public class GlobalStreamThreadTest { assertEquals(GlobalStreamThread.State.DEAD, globalStreamThread.state()); } - @SuppressWarnings("unchecked") @Test public void shouldStayDeadAfterTwoCloses() throws Exception { initializeConsumer(); @@ -209,7 +208,6 @@ public class GlobalStreamThreadTest { assertEquals(GlobalStreamThread.State.DEAD, globalStreamThread.state()); } - @SuppressWarnings("unchecked") @Test public void shouldTransitionToRunningOnStart() throws Exception { initializeConsumer(); @@ -224,7 +222,28 @@ public class GlobalStreamThreadTest { } @Test - public void shouldDieOnInvalidOffsetException() throws Exception { + public void shouldDieOnInvalidOffsetExceptionDuringStartup() throws Exception { + initializeConsumer(); + mockConsumer.setPollException(new InvalidOffsetException("Try Again!") { + @Override + public Set<TopicPartition> partitions() { + return Collections.singleton(topicPartition); + } + }); + + globalStreamThread.start(); + + TestUtils.waitForCondition( + () -> globalStreamThread.state() == DEAD, + 10 * 1000, + "GlobalStreamThread should have died." + ); + + assertFalse(new File(baseDirectoryName + File.separator + "testAppId" + File.separator + "global").exists()); + } + + @Test + public void shouldDieOnInvalidOffsetExceptionWhileRunning() throws Exception { initializeConsumer(); globalStreamThread.start(); @@ -247,13 +266,14 @@ public class GlobalStreamThreadTest { return Collections.singleton(topicPartition); } }); - // feed first record for recovery - mockConsumer.addRecord(new ConsumerRecord<>(GLOBAL_STORE_TOPIC_NAME, 0, 0L, "K1".getBytes(), "V1".getBytes())); TestUtils.waitForCondition( () -> globalStreamThread.state() == DEAD, 10 * 1000, - "GlobalStreamThread should have died."); + "GlobalStreamThread should have died." + ); + + assertFalse(new File(baseDirectoryName + File.separator + "testAppId" + File.separator + "global").exists()); } private void initializeConsumer() { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java index 606b380..1f98eb4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java @@ -110,21 +110,27 @@ public class StateConsumerTest { @Test public void shouldCloseConsumer() throws IOException { - stateConsumer.close(); + stateConsumer.close(false); assertTrue(consumer.closed()); } @Test public void shouldCloseStateMaintainer() throws IOException { - stateConsumer.close(); + stateConsumer.close(false); assertTrue(stateMaintainer.closed); } + @Test + public void shouldWipeStoreOnClose() throws IOException { + stateConsumer.close(true); + assertTrue(stateMaintainer.wipeStore); + } private static class TaskStub implements GlobalStateMaintainer { private final Map<TopicPartition, Long> partitionOffsets; private final Map<TopicPartition, Integer> updatedPartitions = new HashMap<>(); private boolean flushed; + private boolean wipeStore; private boolean closed; TaskStub(final Map<TopicPartition, Long> partitionOffsets) { @@ -141,8 +147,9 @@ public class StateConsumerTest { } @Override - public void close() { + public void close(final boolean wipeStateStore) { closed = true; + wipeStore = wipeStateStore; } @Override diff --git a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java index ae825bc..db8021b 100644 --- a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java +++ b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java @@ -21,22 +21,26 @@ import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.GlobalStateManager; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.Task.TaskType; import java.io.File; import java.util.Map; import java.util.Set; -import org.apache.kafka.streams.processor.internals.Task.TaskType; public class GlobalStateManagerStub implements GlobalStateManager { private final Set<String> storeNames; private final Map<TopicPartition, Long> offsets; + private final File baseDirectory; public boolean initialized; public boolean closed; - public GlobalStateManagerStub(final Set<String> storeNames, final Map<TopicPartition, Long> offsets) { + public GlobalStateManagerStub(final Set<String> storeNames, + final Map<TopicPartition, Long> offsets, + final File baseDirectory) { this.storeNames = storeNames; this.offsets = offsets; + this.baseDirectory = baseDirectory; } @Override @@ -50,7 +54,7 @@ public class GlobalStateManagerStub implements GlobalStateManager { @Override public File baseDir() { - return null; + return baseDirectory; } @Override diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 31659bb..225fd74 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -436,11 +436,11 @@ public class TopologyTestDriver implements Closeable { globalStateManager.setGlobalProcessorContext(globalProcessorContext); globalStateTask = new GlobalStateUpdateTask( + logContext, globalTopology, globalProcessorContext, globalStateManager, - new LogAndContinueExceptionHandler(), - logContext + new LogAndContinueExceptionHandler() ); globalStateTask.initialize(); globalProcessorContext.setRecordContext(new ProcessorRecordContext( @@ -1181,7 +1181,7 @@ public class TopologyTestDriver implements Closeable { } if (globalStateTask != null) { try { - globalStateTask.close(); + globalStateTask.close(false); } catch (final IOException e) { // ignore }