This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ac267dc KAFKA-6362; Async auto-commit should discover coordinator if
it is unknown (#4326)
ac267dc is described below
commit ac267dc5cec605f3981c9db7d889cabd59f09a61
Author: huxi <[email protected]>
AuthorDate: Fri Feb 9 01:49:12 2018 +0800
KAFKA-6362; Async auto-commit should discover coordinator if it is unknown
(#4326)
Currently `maybeAutoCommitOffsetsAsync` does not try to find the
coordinator if it is unknown. As a result, asynchronous auto-commits will fail
indefinitely. This patch changes the behavior to add coordinator discovery to
the async auto-commit path.
---
.../apache/kafka/clients/consumer/KafkaConsumer.java | 2 +-
.../consumer/internals/ConsumerCoordinator.java | 20 +++++++-------------
.../consumer/internals/ConsumerCoordinatorTest.java | 19 +++++++++++++++++++
3 files changed, 27 insertions(+), 14 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 1d84f84..2f7fd58 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1058,7 +1058,7 @@ public class KafkaConsumer<K, V> implements Consumer<K,
V> {
// make sure the offsets of topic partitions the consumer is
unsubscribing from
// are committed since there will be no following rebalance
- this.coordinator.maybeAutoCommitOffsetsNow();
+
this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());
log.debug("Subscribed to partition(s): {}",
Utils.join(partitions, ", "));
this.subscriptions.assignFromUser(new HashSet<>(partitions));
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 5c1e60e..d7c1ce9 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -528,6 +528,7 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
public void onSuccess(Void value) {
pendingAsyncCommits.decrementAndGet();
doCommitOffsetsAsync(offsets, callback);
+ client.pollNoWakeup();
}
@Override
@@ -623,20 +624,10 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
return false;
}
- private void maybeAutoCommitOffsetsAsync(long now) {
- if (autoCommitEnabled) {
- if (coordinatorUnknown()) {
- this.nextAutoCommitDeadline = now + retryBackoffMs;
- } else if (now >= nextAutoCommitDeadline) {
- this.nextAutoCommitDeadline = now + autoCommitIntervalMs;
- doAutoCommitOffsetsAsync();
- }
- }
- }
-
- public void maybeAutoCommitOffsetsNow() {
- if (autoCommitEnabled && !coordinatorUnknown())
+ public void maybeAutoCommitOffsetsAsync(long now) {
+ if (autoCommitEnabled && now >= nextAutoCommitDeadline) {
doAutoCommitOffsetsAsync();
+ }
}
private void doAutoCommitOffsetsAsync() {
@@ -650,8 +641,11 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
log.warn("Asynchronous auto-commit of offsets {} failed:
{}", offsets, exception.getMessage());
if (exception instanceof RetriableException)
nextAutoCommitDeadline = Math.min(time.milliseconds()
+ retryBackoffMs, nextAutoCommitDeadline);
+ else
+ nextAutoCommitDeadline = time.milliseconds() +
autoCommitIntervalMs;
} else {
log.debug("Completed asynchronous auto-commit of offsets
{}", offsets);
+ nextAutoCommitDeadline = time.milliseconds() +
autoCommitIntervalMs;
}
}
});
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 76301a7..c49339b 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -1625,6 +1625,25 @@ public class ConsumerCoordinatorTest {
assertFalse("Heartbeat thread active after close",
threads[i].getName().contains(groupId));
}
+ @Test
+ public void testAutoCommitAfterCoordinatorBackToService() {
+ ConsumerCoordinator coordinator = buildCoordinator(new Metrics(),
assignors,
+ ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true);
+ subscriptions.assignFromUser(Collections.singleton(t1p));
+ subscriptions.seek(t1p, 100L);
+
+ coordinator.coordinatorDead();
+ assertTrue(coordinator.coordinatorUnknown());
+ client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p,
Errors.NONE)));
+
+ // async commit offset should find coordinator
+ time.sleep(autoCommitIntervalMs); // sleep for a while to ensure auto
commit does happen
+ coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());
+ assertFalse(coordinator.coordinatorUnknown());
+ assertEquals(subscriptions.committed(t1p).offset(), 100L);
+ }
+
private ConsumerCoordinator prepareCoordinatorForCloseTest(final boolean
useGroupManagement,
final boolean
autoCommit,
final boolean
leaveGroup) {
--
To stop receiving notification emails like this one, please contact
[email protected].