Repository: kafka Updated Branches: refs/heads/trunk 36afd1095 -> 402aa093d
KAFKA-5937: Improve ProcessorStateManager exception handling Author: Matthias J. Sax <matth...@confluent.io> Reviewers: Ted Yu <yuzhih...@gmail.com>, Damian Guy <damian....@gmail.com>, Guozhang Wang <wangg...@gmail.com> Closes #3913 from mjsax/kafka-5937-exceptions-processor-state-manager Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/402aa093 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/402aa093 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/402aa093 Branch: refs/heads/trunk Commit: 402aa093db243965d2b6c04118ac7ee6d196fd45 Parents: 36afd10 Author: Matthias J. Sax <matth...@confluent.io> Authored: Fri Sep 22 07:33:50 2017 +0800 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Fri Sep 22 07:33:50 2017 +0800 ---------------------------------------------------------------------- .../internals/ProcessorStateManager.java | 47 +++++---- .../processor/internals/StateDirectory.java | 36 ++++--- .../internals/ProcessorStateManagerTest.java | 102 ++++++++++++++++++- 3 files changed, 147 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/402aa093/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 942e41a..2f16547 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -20,7 +20,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.BatchingStateRestoreCallback; @@ -67,8 +66,7 @@ public class ProcessorStateManager implements StateManager { private OffsetCheckpoint checkpoint; /** - * @throws LockException if the state directory cannot be locked because another thread holds the lock - * (this might be recoverable by retrying) + * @throws ProcessorStateException if the task directory does not exist and could not be created * @throws IOException if any severe error happens while creating or locking the state directory */ public ProcessorStateManager(final TaskId taskId, @@ -96,15 +94,7 @@ public class ProcessorStateManager implements StateManager { restoreCallbacks = isStandby ? new HashMap<String, StateRestoreCallback>() : null; this.storeToChangelogTopic = storeToChangelogTopic; - // get a handle on the parent/base directory of the task directory - // note that the parent directory could have been accidentally deleted here, - // so catch that exception if that is the case - try { - baseDir = stateDirectory.directoryForTask(taskId); - } catch (final ProcessorStateException e) { - throw new LockException(String.format("%sFailed to get the directory for task %s. Exception %s", - logPrefix, taskId, e)); - } + baseDir = stateDirectory.directoryForTask(taskId); // load the checkpoint information checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME)); @@ -256,40 +246,49 @@ public class ProcessorStateManager implements StateManager { @Override public void flush() { + ProcessorStateException firstException = null; + // attempting to flush the stores if (!stores.isEmpty()) { log.debug("Flushing all stores registered in the state manager"); for (final StateStore store : stores.values()) { + log.trace("Flushing store {}", store.name()); try { - log.trace("Flushing store={}", store.name()); store.flush(); } catch (final Exception e) { - throw new ProcessorStateException(String.format("%sFailed to flush state store %s", logPrefix, store.name()), e); + if (firstException == null) { + firstException = new ProcessorStateException(String.format("%sFailed to flush state store %s", logPrefix, store.name()), e); + } + log.error("Failed to flush state store {}: ", store.name(), e); } } } + + if (firstException != null) { + throw firstException; + } } /** * {@link StateStore#close() Close} all stores (even in case of failure). - * Re-throw the first + * Log all exception and re-throw the first exception that did occur at the end. * @throws ProcessorStateException if any error happens when closing the state stores */ @Override public void close(final Map<TopicPartition, Long> ackedOffsets) throws ProcessorStateException { - RuntimeException firstException = null; + ProcessorStateException firstException = null; // attempting to close the stores, just in case they // are not closed by a ProcessorNode yet if (!stores.isEmpty()) { log.debug("Closing its state manager and all the registered state stores"); - for (final Map.Entry<String, StateStore> entry : stores.entrySet()) { - log.debug("Closing storage engine {}", entry.getKey()); + for (final StateStore store : stores.values()) { + log.debug("Closing storage engine {}", store.name()); try { - entry.getValue().close(); + store.close(); } catch (final Exception e) { if (firstException == null) { - firstException = new ProcessorStateException(String.format("%sFailed to close state store %s", logPrefix, entry.getKey()), e); + firstException = new ProcessorStateException(String.format("%sFailed to close state store %s", logPrefix, store.name()), e); } - log.error("Failed to close state store {}: ", entry.getKey(), e); + log.error("Failed to close state store {}: ", store.name(), e); } } @@ -309,11 +308,11 @@ public class ProcessorStateManager implements StateManager { public void checkpoint(final Map<TopicPartition, Long> ackedOffsets) { log.trace("Writing checkpoint: {}", ackedOffsets); checkpointedOffsets.putAll(changelogReader.restoredOffsets()); - for (final Map.Entry<String, StateStore> entry : stores.entrySet()) { - final String storeName = entry.getKey(); + for (final StateStore store : stores.values()) { + final String storeName = store.name(); // only checkpoint the offset to the offsets file if // it is persistent AND changelog enabled - if (entry.getValue().persistent() && storeToChangelogTopic.containsKey(storeName)) { + if (store.persistent() && storeToChangelogTopic.containsKey(storeName)) { final String changelogTopic = storeToChangelogTopic.get(storeName); final TopicPartition topicPartition = new TopicPartition(changelogTopic, getPartition(storeName)); if (ackedOffsets.containsKey(topicPartition)) { http://git-wip-us.apache.org/repos/asf/kafka/blob/402aa093/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java index c4262bc..b7bc45c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java @@ -62,39 +62,50 @@ public class StateDirectory { } } + /** + * Ensures that the state base directory as well as the application's sub-directory are created. + * + * @throws ProcessorStateException if the base state directory or application state directory does not exist + * and could not be created + */ public StateDirectory(final String applicationId, final String stateDirConfig, final Time time) { this.time = time; final File baseDir = new File(stateDirConfig); if (!baseDir.exists() && !baseDir.mkdirs()) { - throw new ProcessorStateException(String.format("state directory [%s] doesn't exist and couldn't be created", - stateDirConfig)); + throw new ProcessorStateException( + String.format("base state directory [%s] doesn't exist and couldn't be created", stateDirConfig)); } stateDir = new File(baseDir, applicationId); if (!stateDir.exists() && !stateDir.mkdir()) { - throw new ProcessorStateException(String.format("state directory [%s] doesn't exist and couldn't be created", - stateDir.getPath())); + throw new ProcessorStateException( + String.format("state directory [%s] doesn't exist and couldn't be created", stateDir.getPath())); } } /** - * Get or create the directory for the {@link TaskId} - * @param taskId + * Get or create the directory for the provided {@link TaskId}. * @return directory for the {@link TaskId} + * @throws ProcessorStateException if the task directory does not exists and could not be created */ File directoryForTask(final TaskId taskId) { final File taskDir = new File(stateDir, taskId.toString()); if (!taskDir.exists() && !taskDir.mkdir()) { - throw new ProcessorStateException(String.format("task directory [%s] doesn't exist and couldn't be created", - taskDir.getPath())); + throw new ProcessorStateException( + String.format("task directory [%s] doesn't exist and couldn't be created", taskDir.getPath())); } return taskDir; } + /** + * Get or create the directory for the global stores. + * @return directory for the global stores + * @throws ProcessorStateException if the global store directory does not exists and could not be created + */ File globalStateDir() { final File dir = new File(stateDir, "global"); if (!dir.exists() && !dir.mkdir()) { - throw new ProcessorStateException(String.format("global state directory [%s] doesn't exist and couldn't be created", - dir.getPath())); + throw new ProcessorStateException( + String.format("global state directory [%s] doesn't exist and couldn't be created", dir.getPath())); } return dir; } @@ -102,6 +113,7 @@ public class StateDirectory { private String logPrefix() { return String.format("stream-thread [%s]", Thread.currentThread().getName()); } + /** * Get the lock for the {@link TaskId}s directory if it is available * @param taskId @@ -192,9 +204,7 @@ public class StateDirectory { } /** - * Unlock the state directory for the given {@link TaskId} - * @param taskId - * @throws IOException + * Unlock the state directory for the given {@link TaskId}. */ synchronized void unlock(final TaskId taskId) throws IOException { final LockAndOwner lockAndOwner = locks.get(taskId); http://git-wip-us.apache.org/repos/asf/kafka/blob/402aa093/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index 1db2200..f3135d5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -31,6 +31,7 @@ import org.apache.kafka.test.MockChangelogReader; import org.apache.kafka.test.MockStateStoreSupplier; import org.apache.kafka.test.TestUtils; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -41,6 +42,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; @@ -79,7 +81,6 @@ public class ProcessorStateManagerTest { private OffsetCheckpoint checkpoint; private StateDirectory stateDirectory; - @Before public void setup() { baseDir = TestUtils.tempDirectory(); @@ -487,6 +488,35 @@ public class ProcessorStateManagerTest { } @Test + public void shouldThrowProcessorStateExceptionOnFlushIfStoreThrowsAnException() throws IOException { + + final ProcessorStateManager stateManager = new ProcessorStateManager( + taskId, + Collections.singleton(changelogTopicPartition), + false, + stateDirectory, + Collections.singletonMap(storeName, changelogTopic), + changelogReader, + false, + logContext); + + final MockStateStoreSupplier.MockStateStore stateStore = new MockStateStoreSupplier.MockStateStore(storeName, true) { + @Override + public void flush() { + throw new RuntimeException("KABOOM!"); + } + }; + stateManager.register(stateStore, false, stateStore.stateRestoreCallback); + + try { + stateManager.flush(); + fail("Should throw ProcessorStateException if store flush throws exception"); + } catch (final ProcessorStateException e) { + // pass + } + } + + @Test public void shouldThrowProcessorStateExceptionOnCloseIfStoreThrowsAnException() throws IOException { final ProcessorStateManager stateManager = new ProcessorStateManager( @@ -516,6 +546,76 @@ public class ProcessorStateManagerTest { } @Test + public void shouldFlushAllStoresEvenIfStoreThrowsExcepiton() throws IOException { + final ProcessorStateManager stateManager = new ProcessorStateManager( + taskId, + Collections.singleton(changelogTopicPartition), + false, + stateDirectory, + Collections.singletonMap(storeName, changelogTopic), + changelogReader, + false, + logContext); + + final AtomicBoolean flushedStore = new AtomicBoolean(false); + + final MockStateStoreSupplier.MockStateStore stateStore1 = new MockStateStoreSupplier.MockStateStore(storeName, true) { + @Override + public void flush() { + throw new RuntimeException("KABOOM!"); + } + }; + final MockStateStoreSupplier.MockStateStore stateStore2 = new MockStateStoreSupplier.MockStateStore(storeName + "2", true) { + @Override + public void flush() { + flushedStore.set(true); + } + }; + stateManager.register(stateStore1, false, stateStore1.stateRestoreCallback); + stateManager.register(stateStore2, false, stateStore2.stateRestoreCallback); + + try { + stateManager.flush(); + } catch (final ProcessorStateException expected) { /* ignode */ } + Assert.assertTrue(flushedStore.get()); + } + + @Test + public void shouldCloseAllStoresEvenIfStoreThrowsExcepiton() throws IOException { + final ProcessorStateManager stateManager = new ProcessorStateManager( + taskId, + Collections.singleton(changelogTopicPartition), + false, + stateDirectory, + Collections.singletonMap(storeName, changelogTopic), + changelogReader, + false, + logContext); + + final AtomicBoolean closedStore = new AtomicBoolean(false); + + final MockStateStoreSupplier.MockStateStore stateStore1 = new MockStateStoreSupplier.MockStateStore(storeName, true) { + @Override + public void close() { + throw new RuntimeException("KABOOM!"); + } + }; + final MockStateStoreSupplier.MockStateStore stateStore2 = new MockStateStoreSupplier.MockStateStore(storeName + "2", true) { + @Override + public void close() { + closedStore.set(true); + } + }; + stateManager.register(stateStore1, false, stateStore1.stateRestoreCallback); + stateManager.register(stateStore2, false, stateStore2.stateRestoreCallback); + + try { + stateManager.close(Collections.<TopicPartition, Long>emptyMap()); + } catch (final ProcessorStateException expected) { /* ignode */ } + Assert.assertTrue(closedStore.get()); + } + + @Test public void shouldDeleteCheckpointFileOnCreationIfEosEnabled() throws IOException { checkpoint.write(Collections.<TopicPartition, Long>emptyMap()); assertTrue(checkpointFile.exists());