Repository: kafka Updated Branches: refs/heads/trunk e9a67a8da -> 600859e77
KAFKA-4392; Handle NoSuchFileException gracefully in StateDirectory Author: Guozhang Wang <[email protected]> Reviewers: Damian Guy <[email protected]>, Ismael Juma <[email protected]> Closes #2121 from guozhangwang/K4392-race-dir-cleanup Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/600859e7 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/600859e7 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/600859e7 Branch: refs/heads/trunk Commit: 600859e77c8e9a93212815ac26a6ae8b778fee6f Parents: e9a67a8 Author: Guozhang Wang <[email protected]> Authored: Thu Dec 8 15:50:24 2016 +0000 Committer: Ismael Juma <[email protected]> Committed: Thu Dec 8 15:50:24 2016 +0000 ---------------------------------------------------------------------- .../streams/processor/internals/StateDirectory.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/600859e7/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java index 3048fba..a48ec59 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.nio.channels.FileChannel; import java.nio.channels.FileLock; import java.nio.channels.OverlappingFileLockException; +import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.HashMap; @@ -87,7 +88,17 @@ public class StateDirectory { return true; } final File lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME); - final FileChannel channel = getOrCreateFileChannel(taskId, lockFile.toPath()); + + final FileChannel channel; + + try { + channel = getOrCreateFileChannel(taskId, lockFile.toPath()); + } catch (NoSuchFileException e) { + // FileChannel.open(..) could throw NoSuchFileException when there is another thread + // concurrently deleting the parent directory (i.e. the directory of the taskId) of the lock + // file, in this case we will return immediately indicating locking failed. + return false; + } FileLock lock = tryAcquireLock(channel); while (lock == null && retry > 0) {
