Repository: kafka Updated Branches: refs/heads/trunk f154956a7 -> fa4244745
KAFKA-2860: better handling of auto commit errors Author: Jason Gustafson <[email protected]> Reviewers: Guozhang Wang Closes #553 from hachikuji/KAFKA-2860 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fa424474 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fa424474 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fa424474 Branch: refs/heads/trunk Commit: fa4244745fc363410fd5bc21e0b045f8124a8f9c Parents: f154956 Author: Jason Gustafson <[email protected]> Authored: Wed Nov 18 17:19:47 2015 -0800 Committer: Confluent <[email protected]> Committed: Wed Nov 18 17:19:47 2015 -0800 ---------------------------------------------------------------------- .../consumer/internals/AbstractCoordinator.java | 10 ++- .../consumer/internals/ConsumerCoordinator.java | 91 +++++++++++++++----- 2 files changed, 79 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/fa424474/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index cab7065..9aa1aaf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -242,12 +242,16 @@ public abstract class AbstractCoordinator implements Closeable { private class HeartbeatTask implements DelayedTask { + private boolean requestInFlight = false; + public void reset() { // start or restart the heartbeat task to be executed at the next chance long now = time.milliseconds(); heartbeat.resetSessionTimeout(now); client.unschedule(this); - client.schedule(this, now); + + if (!requestInFlight) + client.schedule(this, now); } @Override @@ -270,10 +274,13 @@ public abstract class AbstractCoordinator implements Closeable { client.schedule(this, now + heartbeat.timeToNextHeartbeat(now)); } else { heartbeat.sentHeartbeat(now); + requestInFlight = true; + RequestFuture<Void> future = sendHeartbeatRequest(); future.addListener(new RequestFutureListener<Void>() { @Override public void onSuccess(Void value) { + requestInFlight = false; long now = time.milliseconds(); heartbeat.receiveHeartbeat(now); long nextHeartbeatTime = now + heartbeat.timeToNextHeartbeat(now); @@ -282,6 +289,7 @@ public abstract class AbstractCoordinator implements Closeable { @Override public void onFailure(RuntimeException e) { + requestInFlight = false; client.schedule(HeartbeatTask.this, time.milliseconds() + retryBackoffMs); } }); http://git-wip-us.apache.org/repos/asf/kafka/blob/fa424474/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 2ee3a4d..f6d1029 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -69,7 +69,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { private final SubscriptionState subscriptions; private final OffsetCommitCallback defaultOffsetCommitCallback; private final boolean autoCommitEnabled; - private DelayedTask autoCommitTask = null; + private final AutoCommitTask autoCommitTask; /** * Initialize the coordination manager. @@ -112,9 +112,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { addMetadataListener(); - if (autoCommitEnabled) - this.autoCommitTask = scheduleAutoCommitTask(autoCommitIntervalMs); - + this.autoCommitTask = autoCommitEnabled ? new AutoCommitTask(autoCommitIntervalMs) : null; this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix, metricTags); } @@ -179,6 +177,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator { // give the assignor a chance to update internal state based on the received assignment assignor.onAssignment(assignment); + // restart the autocommit task if needed + if (autoCommitEnabled) + autoCommitTask.enable(); + // execute the user's callback after rebalance ConsumerRebalanceListener listener = subscriptions.listener(); log.debug("Setting newly assigned partitions {}", subscriptions.assignedPartitions()); @@ -309,8 +311,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator { // we do not need to re-enable wakeups since we are closing already client.disableWakeups(); try { - if (autoCommitTask != null) - client.unschedule(autoCommitTask); maybeAutoCommitOffsetsSync(); } finally { super.close(); @@ -362,25 +362,74 @@ public final class ConsumerCoordinator extends AbstractCoordinator { } } - private DelayedTask scheduleAutoCommitTask(final long interval) { - DelayedTask task = new DelayedTask() { - public void run(long now) { - commitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback() { - @Override - public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { - if (exception != null) - log.error("Auto offset commit failed.", exception); - } - }); - client.schedule(this, now + interval); + private class AutoCommitTask implements DelayedTask { + private final long interval; + private boolean enabled = false; + private boolean requestInFlight = false; + + public AutoCommitTask(long interval) { + this.interval = interval; + } + + public void enable() { + if (!enabled) { + // there shouldn't be any instances scheduled, but call unschedule anyway to ensure + // that this task is only ever scheduled once + client.unschedule(this); + this.enabled = true; + + if (!requestInFlight) { + long now = time.milliseconds(); + client.schedule(this, interval + now); + } + } + } + + public void disable() { + this.enabled = false; + client.unschedule(this); + } + + private void reschedule(long at) { + if (enabled) + client.schedule(this, at); + } + + public void run(final long now) { + if (!enabled) + return; + + if (coordinatorUnknown()) { + log.debug("Cannot auto-commit offsets now since the coordinator is unknown, will retry after backoff"); + client.schedule(this, now + retryBackoffMs); + return; } - }; - client.schedule(task, time.milliseconds() + interval); - return task; + + requestInFlight = true; + commitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback() { + @Override + public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { + requestInFlight = false; + if (exception == null) { + reschedule(now + interval); + } else if (exception instanceof SendFailedException) { + log.debug("Failed to send automatic offset commit, will retry immediately"); + reschedule(now); + } else { + log.warn("Auto offset commit failed: {}", exception.getMessage()); + reschedule(now + interval); + } + } + }); + } } private void maybeAutoCommitOffsetsSync() { if (autoCommitEnabled) { + // disable periodic commits prior to committing synchronously. note that they will + // be re-enabled after a rebalance completes + autoCommitTask.disable(); + try { commitOffsetsSync(subscriptions.allConsumed()); } catch (WakeupException e) { @@ -388,7 +437,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { throw e; } catch (Exception e) { // consistent with async auto-commit failures, we do not propagate the exception - log.error("Auto offset commit failed.", e); + log.warn("Auto offset commit failed: ", e.getMessage()); } } }
