Repository: kafka
Updated Branches:
  refs/heads/trunk 36afd1095 -> 402aa093d


KAFKA-5937: Improve ProcessorStateManager exception handling

Author: Matthias J. Sax <matth...@confluent.io>

Reviewers: Ted Yu <yuzhih...@gmail.com>, Damian Guy <damian....@gmail.com>, 
Guozhang Wang <wangg...@gmail.com>

Closes #3913 from mjsax/kafka-5937-exceptions-processor-state-manager


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/402aa093
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/402aa093
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/402aa093

Branch: refs/heads/trunk
Commit: 402aa093db243965d2b6c04118ac7ee6d196fd45
Parents: 36afd10
Author: Matthias J. Sax <matth...@confluent.io>
Authored: Fri Sep 22 07:33:50 2017 +0800
Committer: Guozhang Wang <wangg...@gmail.com>
Committed: Fri Sep 22 07:33:50 2017 +0800

----------------------------------------------------------------------
 .../internals/ProcessorStateManager.java        |  47 +++++----
 .../processor/internals/StateDirectory.java     |  36 ++++---
 .../internals/ProcessorStateManagerTest.java    | 102 ++++++++++++++++++-
 3 files changed, 147 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/402aa093/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 942e41a..2f16547 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -20,7 +20,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
@@ -67,8 +66,7 @@ public class ProcessorStateManager implements StateManager {
     private OffsetCheckpoint checkpoint;
 
     /**
-     * @throws LockException if the state directory cannot be locked because 
another thread holds the lock
-     *                       (this might be recoverable by retrying)
+     * @throws ProcessorStateException if the task directory does not exist 
and could not be created
      * @throws IOException if any severe error happens while creating or 
locking the state directory
      */
     public ProcessorStateManager(final TaskId taskId,
@@ -96,15 +94,7 @@ public class ProcessorStateManager implements StateManager {
         restoreCallbacks = isStandby ? new HashMap<String, 
StateRestoreCallback>() : null;
         this.storeToChangelogTopic = storeToChangelogTopic;
 
-        // get a handle on the parent/base directory of the task directory
-        // note that the parent directory could have been accidentally deleted 
here,
-        // so catch that exception if that is the case
-        try {
-            baseDir = stateDirectory.directoryForTask(taskId);
-        } catch (final ProcessorStateException e) {
-            throw new LockException(String.format("%sFailed to get the 
directory for task %s. Exception %s",
-                logPrefix, taskId, e));
-        }
+        baseDir = stateDirectory.directoryForTask(taskId);
 
         // load the checkpoint information
         checkpoint = new OffsetCheckpoint(new File(baseDir, 
CHECKPOINT_FILE_NAME));
@@ -256,40 +246,49 @@ public class ProcessorStateManager implements 
StateManager {
 
     @Override
     public void flush() {
+        ProcessorStateException firstException = null;
+        // attempting to flush the stores
         if (!stores.isEmpty()) {
             log.debug("Flushing all stores registered in the state manager");
             for (final StateStore store : stores.values()) {
+                log.trace("Flushing store {}", store.name());
                 try {
-                    log.trace("Flushing store={}", store.name());
                     store.flush();
                 } catch (final Exception e) {
-                    throw new ProcessorStateException(String.format("%sFailed 
to flush state store %s", logPrefix, store.name()), e);
+                    if (firstException == null) {
+                        firstException = new 
ProcessorStateException(String.format("%sFailed to flush state store %s", 
logPrefix, store.name()), e);
+                    }
+                    log.error("Failed to flush state store {}: ", 
store.name(), e);
                 }
             }
         }
+
+        if (firstException != null) {
+            throw firstException;
+        }
     }
 
     /**
      * {@link StateStore#close() Close} all stores (even in case of failure).
-     * Re-throw the first
+     * Log all exception and re-throw the first exception that did occur at 
the end.
      * @throws ProcessorStateException if any error happens when closing the 
state stores
      */
     @Override
     public void close(final Map<TopicPartition, Long> ackedOffsets) throws 
ProcessorStateException {
-        RuntimeException firstException = null;
+        ProcessorStateException firstException = null;
         // attempting to close the stores, just in case they
         // are not closed by a ProcessorNode yet
         if (!stores.isEmpty()) {
             log.debug("Closing its state manager and all the registered state 
stores");
-            for (final Map.Entry<String, StateStore> entry : 
stores.entrySet()) {
-                log.debug("Closing storage engine {}", entry.getKey());
+            for (final StateStore store : stores.values()) {
+                log.debug("Closing storage engine {}", store.name());
                 try {
-                    entry.getValue().close();
+                    store.close();
                 } catch (final Exception e) {
                     if (firstException == null) {
-                        firstException = new 
ProcessorStateException(String.format("%sFailed to close state store %s", 
logPrefix, entry.getKey()), e);
+                        firstException = new 
ProcessorStateException(String.format("%sFailed to close state store %s", 
logPrefix, store.name()), e);
                     }
-                    log.error("Failed to close state store {}: ", 
entry.getKey(), e);
+                    log.error("Failed to close state store {}: ", 
store.name(), e);
                 }
             }
 
@@ -309,11 +308,11 @@ public class ProcessorStateManager implements 
StateManager {
     public void checkpoint(final Map<TopicPartition, Long> ackedOffsets) {
         log.trace("Writing checkpoint: {}", ackedOffsets);
         checkpointedOffsets.putAll(changelogReader.restoredOffsets());
-        for (final Map.Entry<String, StateStore> entry : stores.entrySet()) {
-            final String storeName = entry.getKey();
+        for (final StateStore store : stores.values()) {
+            final String storeName = store.name();
             // only checkpoint the offset to the offsets file if
             // it is persistent AND changelog enabled
-            if (entry.getValue().persistent() && 
storeToChangelogTopic.containsKey(storeName)) {
+            if (store.persistent() && 
storeToChangelogTopic.containsKey(storeName)) {
                 final String changelogTopic = 
storeToChangelogTopic.get(storeName);
                 final TopicPartition topicPartition = new 
TopicPartition(changelogTopic, getPartition(storeName));
                 if (ackedOffsets.containsKey(topicPartition)) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/402aa093/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index c4262bc..b7bc45c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -62,39 +62,50 @@ public class StateDirectory {
         }
     }
 
+    /**
+     * Ensures that the state base directory as well as the application's 
sub-directory are created.
+     *
+     * @throws ProcessorStateException if the base state directory or 
application state directory does not exist
+     *                                 and could not be created
+     */
     public StateDirectory(final String applicationId, final String 
stateDirConfig, final Time time) {
         this.time = time;
         final File baseDir = new File(stateDirConfig);
         if (!baseDir.exists() && !baseDir.mkdirs()) {
-            throw new ProcessorStateException(String.format("state directory 
[%s] doesn't exist and couldn't be created",
-                                                            stateDirConfig));
+            throw new ProcessorStateException(
+                String.format("base state directory [%s] doesn't exist and 
couldn't be created", stateDirConfig));
         }
         stateDir = new File(baseDir, applicationId);
         if (!stateDir.exists() && !stateDir.mkdir()) {
-            throw new ProcessorStateException(String.format("state directory 
[%s] doesn't exist and couldn't be created",
-                                                            
stateDir.getPath()));
+            throw new ProcessorStateException(
+                String.format("state directory [%s] doesn't exist and couldn't 
be created", stateDir.getPath()));
         }
     }
 
     /**
-     * Get or create the directory for the {@link TaskId}
-     * @param taskId
+     * Get or create the directory for the provided {@link TaskId}.
      * @return directory for the {@link TaskId}
+     * @throws ProcessorStateException if the task directory does not exists 
and could not be created
      */
     File directoryForTask(final TaskId taskId) {
         final File taskDir = new File(stateDir, taskId.toString());
         if (!taskDir.exists() && !taskDir.mkdir()) {
-            throw new ProcessorStateException(String.format("task directory 
[%s] doesn't exist and couldn't be created",
-                                                            
taskDir.getPath()));
+            throw new ProcessorStateException(
+                String.format("task directory [%s] doesn't exist and couldn't 
be created", taskDir.getPath()));
         }
         return taskDir;
     }
 
+    /**
+     * Get or create the directory for the global stores.
+     * @return directory for the global stores
+     * @throws ProcessorStateException if the global store directory does not 
exists and could not be created
+     */
     File globalStateDir() {
         final File dir = new File(stateDir, "global");
         if (!dir.exists() && !dir.mkdir()) {
-            throw new ProcessorStateException(String.format("global state 
directory [%s] doesn't exist and couldn't be created",
-                                                            dir.getPath()));
+            throw new ProcessorStateException(
+                String.format("global state directory [%s] doesn't exist and 
couldn't be created", dir.getPath()));
         }
         return dir;
     }
@@ -102,6 +113,7 @@ public class StateDirectory {
     private String logPrefix() {
         return String.format("stream-thread [%s]", 
Thread.currentThread().getName());
     }
+
     /**
      * Get the lock for the {@link TaskId}s directory if it is available
      * @param taskId
@@ -192,9 +204,7 @@ public class StateDirectory {
     }
 
     /**
-     * Unlock the state directory for the given {@link TaskId}
-     * @param taskId
-     * @throws IOException
+     * Unlock the state directory for the given {@link TaskId}.
      */
     synchronized void unlock(final TaskId taskId) throws IOException {
         final LockAndOwner lockAndOwner = locks.get(taskId);

http://git-wip-us.apache.org/repos/asf/kafka/blob/402aa093/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 1db2200..f3135d5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.test.MockChangelogReader;
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -41,6 +42,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -79,7 +81,6 @@ public class ProcessorStateManagerTest {
     private OffsetCheckpoint checkpoint;
     private StateDirectory stateDirectory;
 
-
     @Before
     public void setup() {
         baseDir = TestUtils.tempDirectory();
@@ -487,6 +488,35 @@ public class ProcessorStateManagerTest {
     }
 
     @Test
+    public void 
shouldThrowProcessorStateExceptionOnFlushIfStoreThrowsAnException() throws 
IOException {
+
+        final ProcessorStateManager stateManager = new ProcessorStateManager(
+            taskId,
+            Collections.singleton(changelogTopicPartition),
+            false,
+            stateDirectory,
+            Collections.singletonMap(storeName, changelogTopic),
+            changelogReader,
+            false,
+            logContext);
+
+        final MockStateStoreSupplier.MockStateStore stateStore = new 
MockStateStoreSupplier.MockStateStore(storeName, true) {
+            @Override
+            public void flush() {
+                throw new RuntimeException("KABOOM!");
+            }
+        };
+        stateManager.register(stateStore, false, 
stateStore.stateRestoreCallback);
+
+        try {
+            stateManager.flush();
+            fail("Should throw ProcessorStateException if store flush throws 
exception");
+        } catch (final ProcessorStateException e) {
+            // pass
+        }
+    }
+
+    @Test
     public void 
shouldThrowProcessorStateExceptionOnCloseIfStoreThrowsAnException() throws 
IOException {
 
         final ProcessorStateManager stateManager = new ProcessorStateManager(
@@ -516,6 +546,76 @@ public class ProcessorStateManagerTest {
     }
 
     @Test
+    public void shouldFlushAllStoresEvenIfStoreThrowsExcepiton() throws 
IOException {
+        final ProcessorStateManager stateManager = new ProcessorStateManager(
+            taskId,
+            Collections.singleton(changelogTopicPartition),
+            false,
+            stateDirectory,
+            Collections.singletonMap(storeName, changelogTopic),
+            changelogReader,
+            false,
+            logContext);
+
+        final AtomicBoolean flushedStore = new AtomicBoolean(false);
+
+        final MockStateStoreSupplier.MockStateStore stateStore1 = new 
MockStateStoreSupplier.MockStateStore(storeName, true) {
+            @Override
+            public void flush() {
+                throw new RuntimeException("KABOOM!");
+            }
+        };
+        final MockStateStoreSupplier.MockStateStore stateStore2 = new 
MockStateStoreSupplier.MockStateStore(storeName + "2", true) {
+            @Override
+            public void flush() {
+                flushedStore.set(true);
+            }
+        };
+        stateManager.register(stateStore1, false, 
stateStore1.stateRestoreCallback);
+        stateManager.register(stateStore2, false, 
stateStore2.stateRestoreCallback);
+
+        try {
+            stateManager.flush();
+        } catch (final ProcessorStateException expected) { /* ignode */ }
+        Assert.assertTrue(flushedStore.get());
+    }
+
+    @Test
+    public void shouldCloseAllStoresEvenIfStoreThrowsExcepiton() throws 
IOException {
+        final ProcessorStateManager stateManager = new ProcessorStateManager(
+            taskId,
+            Collections.singleton(changelogTopicPartition),
+            false,
+            stateDirectory,
+            Collections.singletonMap(storeName, changelogTopic),
+            changelogReader,
+            false,
+            logContext);
+
+        final AtomicBoolean closedStore = new AtomicBoolean(false);
+
+        final MockStateStoreSupplier.MockStateStore stateStore1 = new 
MockStateStoreSupplier.MockStateStore(storeName, true) {
+            @Override
+            public void close() {
+                throw new RuntimeException("KABOOM!");
+            }
+        };
+        final MockStateStoreSupplier.MockStateStore stateStore2 = new 
MockStateStoreSupplier.MockStateStore(storeName + "2", true) {
+            @Override
+            public void close() {
+                closedStore.set(true);
+            }
+        };
+        stateManager.register(stateStore1, false, 
stateStore1.stateRestoreCallback);
+        stateManager.register(stateStore2, false, 
stateStore2.stateRestoreCallback);
+
+        try {
+            stateManager.close(Collections.<TopicPartition, Long>emptyMap());
+        } catch (final ProcessorStateException expected) { /* ignode */ }
+        Assert.assertTrue(closedStore.get());
+    }
+
+    @Test
     public void shouldDeleteCheckpointFileOnCreationIfEosEnabled() throws 
IOException {
         checkpoint.write(Collections.<TopicPartition, Long>emptyMap());
         assertTrue(checkpointFile.exists());

Reply via email to