Repository: kafka Updated Branches: refs/heads/trunk 68f42210a -> 4a9e7607b
KAFKA-2742: Fix SourceTaskOffsetCommitter to handle removal of commit tasks when they are already in progress. Author: Ewen Cheslack-Postava <[email protected]> Reviewers: Guozhang Wang Closes #421 from ewencp/wait-on-in-progress-source-offset-commits Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4a9e7607 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4a9e7607 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4a9e7607 Branch: refs/heads/trunk Commit: 4a9e7607b9e563195556873b6ec9b74561cedfbb Parents: 68f4221 Author: Ewen Cheslack-Postava <[email protected]> Authored: Thu Nov 5 08:42:44 2015 -0800 Committer: Guozhang Wang <[email protected]> Committed: Thu Nov 5 08:42:44 2015 -0800 ---------------------------------------------------------------------- .../runtime/SourceTaskOffsetCommitter.java | 67 +++++++++++++++----- 1 file changed, 52 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/4a9e7607/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SourceTaskOffsetCommitter.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SourceTaskOffsetCommitter.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SourceTaskOffsetCommitter.java index 20a79ca..6bb51b9 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SourceTaskOffsetCommitter.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SourceTaskOffsetCommitter.java @@ -18,11 +18,13 @@ package org.apache.kafka.copycat.runtime; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.copycat.errors.CopycatException; import org.apache.kafka.copycat.util.ConnectorTaskId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -39,9 +41,6 @@ import java.util.concurrent.TimeUnit; * schedule. Instead, this class tracks all the active tasks, their schedule for commits, and * ensures they are invoked in a timely fashion. * </p> - * <p> - * The current implementation uses a single thread to process commits and - * </p> */ class SourceTaskOffsetCommitter { private static final Logger log = LoggerFactory.getLogger(SourceTaskOffsetCommitter.class); @@ -49,7 +48,7 @@ class SourceTaskOffsetCommitter { private Time time; private WorkerConfig config; private ScheduledExecutorService commitExecutorService = null; - private HashMap<ConnectorTaskId, ScheduledFuture<?>> committers = new HashMap<>(); + private HashMap<ConnectorTaskId, ScheduledCommitTask> committers = new HashMap<>(); SourceTaskOffsetCommitter(Time time, WorkerConfig config) { this.time = time; @@ -69,22 +68,43 @@ class SourceTaskOffsetCommitter { } public void schedule(final ConnectorTaskId id, final WorkerSourceTask workerTask) { - long commitIntervalMs = config.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG); - ScheduledFuture<?> commitFuture = commitExecutorService.scheduleWithFixedDelay(new Runnable() { - @Override - public void run() { - commit(workerTask); - } - }, commitIntervalMs, commitIntervalMs, TimeUnit.MILLISECONDS); - committers.put(id, commitFuture); + synchronized (committers) { + long commitIntervalMs = config.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG); + ScheduledFuture<?> commitFuture = commitExecutorService.schedule(new Runnable() { + @Override + public void run() { + commit(id, workerTask); + } + }, commitIntervalMs, TimeUnit.MILLISECONDS); + committers.put(id, new ScheduledCommitTask(commitFuture)); + } } public void remove(ConnectorTaskId id) { - ScheduledFuture<?> commitFuture = committers.remove(id); - commitFuture.cancel(false); + final ScheduledCommitTask task; + synchronized (committers) { + task = committers.remove(id); + task.cancelled = true; + task.commitFuture.cancel(false); + } + if (task.finishedLatch != null) { + try { + task.finishedLatch.await(); + } catch (InterruptedException e) { + throw new CopycatException("Unexpected interruption in SourceTaskOffsetCommitter.", e); + } + } } - public void commit(WorkerSourceTask workerTask) { + public void commit(ConnectorTaskId id, WorkerSourceTask workerTask) { + final ScheduledCommitTask task; + synchronized (committers) { + task = committers.get(id); + if (task == null || task.cancelled) + return; + task.finishedLatch = new CountDownLatch(1); + } + try { log.debug("Committing offsets for {}", workerTask); boolean success = workerTask.commitOffsets(); @@ -96,7 +116,24 @@ class SourceTaskOffsetCommitter { // thread would cause the fixed interval schedule on the ExecutorService to stop running // for that task log.error("Unhandled exception when committing {}: ", workerTask, t); + } finally { + synchronized (committers) { + task.finishedLatch.countDown(); + if (!task.cancelled) + schedule(id, workerTask); + } } } + private static class ScheduledCommitTask { + ScheduledFuture<?> commitFuture; + boolean cancelled; + CountDownLatch finishedLatch; + + ScheduledCommitTask(ScheduledFuture<?> commitFuture) { + this.commitFuture = commitFuture; + this.cancelled = false; + this.finishedLatch = null; + } + } }
