This is an automated email from the ASF dual-hosted git repository.
lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a932520 KAFKA-7126; Reduce number of rebalance for large consumer
group after a topic is created
a932520 is described below
commit a932520135d42c7d9731064d96c21ab2fc5de696
Author: Jon Lee <[email protected]>
AuthorDate: Thu Jul 26 10:28:07 2018 -0700
KAFKA-7126; Reduce number of rebalance for large consumer group after a
topic is created
This patch forces metadata update for consumers with pattern subscription
at the beginning of rebalance (retry.backoff.ms is respected). This is to
prevent such consumers from detecting subscription changes (e.g., new topic
creation) independently and triggering multiple unnecessary rebalances.
KAFKA-7126 contains detailed scenarios and rationale.
Author: Jon Lee <[email protected]>
Reviewers: Jason Gustafson <[email protected]>, Ted Yu
<[email protected]>, Dong Lin <[email protected]>
Closes #5408 from jonlee2/KAFKA-7126
---
.../java/org/apache/kafka/clients/Metadata.java | 16 +++++++-
.../consumer/internals/ConsumerCoordinator.java | 10 +++++
.../internals/ConsumerCoordinatorTest.java | 44 ++++++++++++++++++++++
3 files changed, 68 insertions(+), 2 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 6c663cf..1413eac 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -125,14 +125,26 @@ public final class Metadata implements Closeable {
}
/**
+ * Return the next time when the current cluster info can be updated
(i.e., backoff time has elapsed).
+ *
+ * @param nowMs current time in ms
+ * @return remaining time in ms till the cluster info can be updated again
+ */
+ public synchronized long timeToAllowUpdate(long nowMs) {
+ return Math.max(this.lastRefreshMs + this.refreshBackoffMs - nowMs, 0);
+ }
+
+ /**
* The next time to update the cluster info is the maximum of the time the
current info will expire and the time the
* current info can be updated (i.e. backoff time has elapsed); If an
update has been request then the expiry time
* is now
+ *
+ * @param nowMs current time in ms
+ * @return remaining time in ms till updating the cluster info
*/
public synchronized long timeToNextUpdate(long nowMs) {
long timeToExpire = needUpdate ? 0 :
Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0);
- long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs -
nowMs;
- return Math.max(timeToExpire, timeToAllowUpdate);
+ return Math.max(timeToExpire, timeToAllowUpdate(nowMs));
}
/**
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index f9b77e9..8f25d6e 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -328,6 +328,16 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
// we need to ensure that the metadata is fresh before joining
initially. This ensures
// that we have matched the pattern against the cluster's
topics at least once before joining.
if (subscriptions.hasPatternSubscription()) {
+ // For consumer group that uses pattern-based
subscription, after a topic is created,
+ // any consumer that discovers the topic after metadata
refresh can trigger rebalance
+ // across the entire consumer group. Multiple rebalances
can be triggered after one topic
+ // creation if consumers refresh metadata at vastly
different times. We can significantly
+ // reduce the number of rebalances caused by single topic
creation by asking consumer to
+ // refresh metadata before re-joining the group as long as
the refresh backoff time has
+ // passed.
+ if (this.metadata.timeToAllowUpdate(currentTime) == 0) {
+ this.metadata.requestUpdate();
+ }
if
(!client.ensureFreshMetadata(remainingTimeAtLeastZero(timeoutMs, elapsed))) {
return false;
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index ba392c6..cec56b0 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -514,6 +514,50 @@ public class ConsumerCoordinatorTest {
}
@Test
+ public void
testForceMetadataRefreshForPatternSubscriptionDuringRebalance() {
+ // Set up a non-leader consumer with pattern subscription and a
cluster containing one topic matching the
+ // pattern.
+ final String consumerId = "consumer";
+
+ subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
+ metadata.update(TestUtils.singletonCluster(topic1, 1),
Collections.<String>emptySet(),
+ time.milliseconds());
+ assertEquals(singleton(topic1), subscriptions.subscription());
+
+ client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+ coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
+
+ // Instrument the test so that metadata will contain two topics after
next refresh.
+ client.prepareMetadataUpdate(cluster, Collections.emptySet());
+
+ client.prepareResponse(joinGroupFollowerResponse(1, consumerId,
"leader", Errors.NONE));
+ client.prepareResponse(new MockClient.RequestMatcher() {
+ @Override
+ public boolean matches(AbstractRequest body) {
+ SyncGroupRequest sync = (SyncGroupRequest) body;
+ return sync.memberId().equals(consumerId) &&
+ sync.generationId() == 1 &&
+ sync.groupAssignment().isEmpty();
+ }
+ }, syncGroupResponse(singletonList(t1p), Errors.NONE));
+
+ partitionAssignor.prepare(singletonMap(consumerId,
singletonList(t1p)));
+
+ // This will trigger rebalance.
+ coordinator.poll(Long.MAX_VALUE);
+
+ // Make sure that the metadata was refreshed during the rebalance and
thus subscriptions now contain two topics.
+ final Set<String> updatedSubscriptionSet = new
HashSet<>(Arrays.asList(topic1, topic2));
+ assertEquals(updatedSubscriptionSet, subscriptions.subscription());
+
+ // Refresh the metadata again. Since there have been no changes since
the last refresh, it won't trigger
+ // rebalance again.
+ metadata.requestUpdate();
+ client.poll(Long.MAX_VALUE, time.milliseconds());
+ assertFalse(coordinator.rejoinNeededOrPending());
+ }
+
+ @Test
public void testWakeupDuringJoin() {
final String consumerId = "leader";