Repository: kafka Updated Branches: refs/heads/trunk 1b764c5e8 -> 996e29cfe
KAFKA-3619: File handles are leaked on .lock files of ProcessorStateManager Kafka Streams seems to hold file handles on the `.lock` files for the state dirs, resulting in an explosion of filehandles over time. Running `lsof` shows the number of open filehandles on the `.lock` file increasing rapidly over time. In a separate test project, I reproduced the issue and determined that in order for the filehandle to be relinquished the `FileChannel` instance must be properly closed. Applying this patch seems to resolve the issue in my job. Author: Greg Fodor <[email protected]> Reviewers: Guozhang Wang <[email protected]> Closes #1267 from gfodor/bug/state-lock-filehandle-leak Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/996e29cf Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/996e29cf Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/996e29cf Branch: refs/heads/trunk Commit: 996e29cfe8a9e5a45d4b778a84fb20479eeba469 Parents: 1b764c5 Author: Greg Fodor <[email protected]> Authored: Mon Apr 25 13:45:51 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Mon Apr 25 13:45:51 2016 -0700 ---------------------------------------------------------------------- .../streams/processor/internals/ProcessorStateManager.java | 4 ++++ .../apache/kafka/streams/processor/internals/StreamThread.java | 1 + .../streams/processor/internals/ProcessorStateManagerTest.java | 5 ++++- 3 files changed, 9 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/996e29cf/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 003b988..0cdf44c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -134,6 +134,9 @@ public class ProcessorStateManager { retry--; lock = lockStateDirectory(channel); } + if (lock == null) { + channel.close(); + } return lock; } @@ -368,6 +371,7 @@ public class ProcessorStateManager { } finally { // release the state directory directoryLock directoryLock.release(); + directoryLock.channel().close(); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/996e29cf/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index f02683e..eff90e8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -519,6 +519,7 @@ public class StreamThread extends Thread { if (directoryLock != null) { try { directoryLock.release(); + directoryLock.channel().close(); } catch (IOException e) { log.error("Failed to release the state directory lock"); } http://git-wip-us.apache.org/repos/asf/kafka/blob/996e29cf/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index 84b59e6..e3669e8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -213,7 +213,10 @@ public class ProcessorStateManagerTest { try { assertNotNull(lock); } finally { - if (lock != null) lock.release(); + if (lock != null) { + lock.release(); + lock.channel().close(); + } } } finally { Utils.delete(baseDir);
