Repository: kafka Updated Branches: refs/heads/0.10.0 9ba2fdf8b -> 8d38c115a
KAFKA-3805: Check if DB is null. - Check if DB is null before flushing or closing. In some cases, a state store is closed twice. This happens in `StreamTask.close()` where both `node.close()` and `super.close` (in `ProcessorManager`) are called in a sequence. If the user's processor defines a `close` that closes the underlying state store, then the second close will be redundant. Author: Eno Thereska <[email protected]> Reviewers: Andrés Gómez, Ismael Juma, Guozhang Wang Closes #1485 from enothereska/KAFKA-3805-locks (cherry picked from commit 751fe9309011b99f60c1cb03c23a47d0444dce05) Signed-off-by: Guozhang Wang <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8d38c115 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8d38c115 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8d38c115 Branch: refs/heads/0.10.0 Commit: 8d38c115ab6e61496eee84f790220f1643a1a804 Parents: 9ba2fdf Author: Eno Thereska <[email protected]> Authored: Thu Jun 16 16:18:02 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Thu Jun 16 16:18:11 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/kafka/streams/processor/StateStore.java | 4 +++- .../streams/processor/internals/ProcessorStateManager.java | 4 ++++ .../apache/kafka/streams/state/internals/RocksDBStore.java | 9 +++++++++ 3 files changed, 16 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/8d38c115/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java index f79e6f6..68f3644 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java @@ -46,7 +46,9 @@ public interface StateStore { void flush(); /** - * Close the storage engine + * Close the storage engine. + * Note that this function needs to be idempotent since it may be called + * several times on the same state store. */ void close(); http://git-wip-us.apache.org/repos/asf/kafka/blob/8d38c115/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 1d97384..92b1069 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -134,6 +134,8 @@ public class ProcessorStateManager { retry--; lock = lockStateDirectory(channel); } + // TODO: closing the channel here risks releasing all locks on the file + // see {@link https://issues.apache.org/jira/browse/KAFKA-3812} if (lock == null) { channel.close(); } @@ -336,6 +338,8 @@ public class ProcessorStateManager { */ public void close(Map<TopicPartition, Long> ackedOffsets) throws IOException { try { + // attempting to flush and close the stores, just in case they + // are not closed by a ProcessorNode yet if (!stores.isEmpty()) { log.debug("Closing stores."); for (Map.Entry<String, StateStore> entry : stores.entrySet()) { http://git-wip-us.apache.org/repos/asf/kafka/blob/8d38c115/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 37609a0..a6dc881 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -404,6 +404,10 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { @Override public void flush() { + if (db == null) { + return; + } + // flush of the cache entries if necessary flushCache(); @@ -424,6 +428,11 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { @Override public void close() { + + if (db == null) { + return; + } + flush(); db.close(); }
