Repository: kafka Updated Branches: refs/heads/trunk 1faab034b -> c1f8f689a
HOTFIX: poll even when all partitions are paused. handle concurrent cleanup * We need to poll periodically even when all partitions are paused in order to respond to a possible rebalance promptly. * There is a race condition when two (or more) threads try to clean up the same state directory. One of the thread fails with FileNotFoundException. Thus the new code simply catches it and ignore. Author: Yasuhiro Matsuda <[email protected]> Reviewers: Gwen Shapira Closes #893 from ymatsuda/hotfix Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c1f8f689 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c1f8f689 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c1f8f689 Branch: refs/heads/trunk Commit: c1f8f689af43f5ce5a95dad86537db4615449694 Parents: 1faab03 Author: Yasuhiro Matsuda <[email protected]> Authored: Wed Feb 10 15:02:27 2016 -0700 Committer: Gwen Shapira <[email protected]> Committed: Wed Feb 10 15:02:27 2016 -0700 ---------------------------------------------------------------------- .../processor/internals/StreamThread.java | 43 +++++++++++++------- 1 file changed, 28 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c1f8f689/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index d51974a..18dc0ec 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -51,6 +51,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; import java.nio.channels.FileLock; import java.util.ArrayList; @@ -300,6 +301,7 @@ public class StreamThread extends Thread { private void runLoop() { int totalNumBuffered = 0; + long lastPoll = 0L; boolean requiresPoll = true; ensureCopartitioning(builder.copartitionGroups()); @@ -314,6 +316,7 @@ public class StreamThread extends Thread { long startPoll = time.milliseconds(); ConsumerRecords<byte[], byte[]> records = consumer.poll(totalNumBuffered == 0 ? this.pollTimeMs : 0); + lastPoll = time.milliseconds(); if (!records.isEmpty()) { for (TopicPartition partition : records.partitions()) { @@ -340,6 +343,12 @@ public class StreamThread extends Thread { } maybePunctuate(); + + // if pollTimeMs has passed since the last poll, we poll to respond to a possible rebalance + // even when we paused all partitions. + if (lastPoll + this.pollTimeMs < time.milliseconds()) + requiresPoll = true; + } else { // even when no task is assigned, we must poll to get a task. requiresPoll = true; @@ -489,21 +498,25 @@ public class StreamThread extends Thread { TaskId id = TaskId.parse(dirName.substring(dirName.lastIndexOf("-") + 1)); // try to acquire the exclusive lock on the state directory - FileLock directoryLock = null; - try { - directoryLock = ProcessorStateManager.lockStateDirectory(dir); - if (directoryLock != null) { - log.info("Deleting obsolete state directory {} for task {} after delayed {} ms.", dir.getAbsolutePath(), id, cleanTimeMs); - Utils.delete(dir); - } - } catch (IOException e) { - log.error("Failed to lock the state directory due to an unexpected exception", e); - } finally { - if (directoryLock != null) { - try { - directoryLock.release(); - } catch (IOException e) { - log.error("Failed to release the state directory lock"); + if (dir.exists()) { + FileLock directoryLock = null; + try { + directoryLock = ProcessorStateManager.lockStateDirectory(dir); + if (directoryLock != null) { + log.info("Deleting obsolete state directory {} for task {} after delayed {} ms.", dir.getAbsolutePath(), id, cleanTimeMs); + Utils.delete(dir); + } + } catch (FileNotFoundException e) { + // the state directory may be deleted by another thread + } catch (IOException e) { + log.error("Failed to lock the state directory due to an unexpected exception", e); + } finally { + if (directoryLock != null) { + try { + directoryLock.release(); + } catch (IOException e) { + log.error("Failed to release the state directory lock"); + } } } }
