Repository: kafka Updated Branches: refs/heads/trunk d0614f97b -> 7b3d1bf6a
KAFKA-2867: Fix missing WorkerSourceTask synchronization and handling of InterruptException. Author: Ewen Cheslack-Postava <[email protected]> Reviewers: Jun Rao <[email protected]> Closes #566 from ewencp/kafka-2867-fix-source-sync-and-interrupt Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7b3d1bf6 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7b3d1bf6 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7b3d1bf6 Branch: refs/heads/trunk Commit: 7b3d1bf6a398f5b1f454a7719d04f5ee9e630f96 Parents: d0614f9 Author: Ewen Cheslack-Postava <[email protected]> Authored: Fri Nov 20 10:04:40 2015 -0800 Committer: Jun Rao <[email protected]> Committed: Fri Nov 20 10:04:40 2015 -0800 ---------------------------------------------------------------------- .../kafka/connect/runtime/WorkerSourceTask.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/7b3d1bf6/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index 7178542..6c61d79 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -203,7 +203,7 @@ class WorkerSourceTask implements WorkerTask { removed = outstandingMessagesBacklog.remove(record); // But if neither one had it, something is very wrong if (removed == null) { - log.error("Saw callback for record that was not present in the outstanding message set: " + log.error("CRITICAL Saw callback for record that was not present in the outstanding message set: " + "{}", record); } else if (flushing && outstandingMessages.isEmpty()) { // flush thread may be waiting on the outstanding messages to clear @@ -231,19 +231,25 @@ class WorkerSourceTask implements WorkerTask { // to persistent storage // Next we need to wait for all outstanding messages to finish sending + log.debug("{} flushing {} outstanding messages for offset commit", this, outstandingMessages.size()); while (!outstandingMessages.isEmpty()) { try { long timeoutMs = timeout - time.milliseconds(); if (timeoutMs <= 0) { log.error( "Failed to flush {}, timed out while waiting for producer to flush outstanding " - + "messages", this.toString()); + + "messages, {} left ({})", this, outstandingMessages.size(), outstandingMessages); finishFailedFlush(); return false; } this.wait(timeoutMs); } catch (InterruptedException e) { - // ignore + // We can get interrupted if we take too long committing when the work thread shutdown is requested, + // requiring a forcible shutdown. Give up since we can't safely commit any offsets, but also need + // to stop immediately + log.error("{} Interrupted while flushing messages, offsets will not be committed", this); + finishFailedFlush(); + return false; } } @@ -309,7 +315,7 @@ class WorkerSourceTask implements WorkerTask { flushing = false; } - private void finishSuccessfulFlush() { + private synchronized void finishSuccessfulFlush() { // If we were successful, we can just swap instead of replacing items back into the original map IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> temp = outstandingMessages; outstandingMessages = outstandingMessagesBacklog;
