This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new b0de9ba  KAFKA-7126; Reduce number of rebalance for large consumer 
group after a topic is created
b0de9ba is described below

commit b0de9bad4af60ca9df98974f513579091f431959
Author: Jon Lee <[email protected]>
AuthorDate: Thu Jul 26 10:28:07 2018 -0700

    KAFKA-7126; Reduce number of rebalance for large consumer group after a 
topic is created
    
    This patch forces metadata update for consumers with pattern subscription 
at the beginning of rebalance (retry.backoff.ms is respected). This is to 
prevent such consumers from detecting subscription changes (e.g., new topic 
creation) independently and triggering multiple unnecessary rebalances. 
KAFKA-7126 contains detailed scenarios and rationale.
    
    Author: Jon Lee <[email protected]>
    
    Reviewers: Jason Gustafson <[email protected]>, Ted Yu 
<[email protected]>, Dong Lin <[email protected]>
    
    Closes #5408 from jonlee2/KAFKA-7126
    
    (cherry picked from commit a932520135d42c7d9731064d96c21ab2fc5de696)
    Signed-off-by: Dong Lin <[email protected]>
---
 .../java/org/apache/kafka/clients/Metadata.java    | 16 +++++++-
 .../consumer/internals/ConsumerCoordinator.java    | 10 +++++
 .../internals/ConsumerCoordinatorTest.java         | 44 ++++++++++++++++++++++
 3 files changed, 68 insertions(+), 2 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java 
b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index ec07f13..17d9839 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -125,14 +125,26 @@ public final class Metadata implements Closeable {
     }
 
     /**
+     * Return the next time when the current cluster info can be updated 
(i.e., backoff time has elapsed).
+     *
+     * @param nowMs current time in ms
+     * @return remaining time in ms till the cluster info can be updated again
+     */
+    public synchronized long timeToAllowUpdate(long nowMs) {
+        return Math.max(this.lastRefreshMs + this.refreshBackoffMs - nowMs, 0);
+    }
+
+    /**
      * The next time to update the cluster info is the maximum of the time the 
current info will expire and the time the
      * current info can be updated (i.e. backoff time has elapsed); If an 
update has been request then the expiry time
      * is now
+     *
+     * @param nowMs current time in ms
+     * @return remaining time in ms till updating the cluster info
      */
     public synchronized long timeToNextUpdate(long nowMs) {
         long timeToExpire = needUpdate ? 0 : 
Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0);
-        long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - 
nowMs;
-        return Math.max(timeToExpire, timeToAllowUpdate);
+        return Math.max(timeToExpire, timeToAllowUpdate(nowMs));
     }
 
     /**
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index e04cdeb..51ae58e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -323,6 +323,16 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
                 // we need to ensure that the metadata is fresh before joining 
initially. This ensures
                 // that we have matched the pattern against the cluster's 
topics at least once before joining.
                 if (subscriptions.hasPatternSubscription()) {
+                    // For consumer group that uses pattern-based 
subscription, after a topic is created,
+                    // any consumer that discovers the topic after metadata 
refresh can trigger rebalance
+                    // across the entire consumer group. Multiple rebalances 
can be triggered after one topic
+                    // creation if consumers refresh metadata at vastly 
different times. We can significantly
+                    // reduce the number of rebalances caused by single topic 
creation by asking consumer to
+                    // refresh metadata before re-joining the group as long as 
the refresh backoff time has
+                    // passed.
+                    if (this.metadata.timeToAllowUpdate(currentTime) == 0) {
+                        this.metadata.requestUpdate();
+                    }
                     if 
(!client.ensureFreshMetadata(remainingTimeAtLeastZero(timeoutMs, elapsed))) {
                         return false;
                     }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index ba392c6..cec56b0 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -514,6 +514,50 @@ public class ConsumerCoordinatorTest {
     }
 
     @Test
+    public void 
testForceMetadataRefreshForPatternSubscriptionDuringRebalance() {
+        // Set up a non-leader consumer with pattern subscription and a 
cluster containing one topic matching the
+        // pattern.
+        final String consumerId = "consumer";
+
+        subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
+        metadata.update(TestUtils.singletonCluster(topic1, 1), 
Collections.<String>emptySet(),
+            time.milliseconds());
+        assertEquals(singleton(topic1), subscriptions.subscription());
+
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
+
+        // Instrument the test so that metadata will contain two topics after 
next refresh.
+        client.prepareMetadataUpdate(cluster, Collections.emptySet());
+
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, 
"leader", Errors.NONE));
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(AbstractRequest body) {
+                SyncGroupRequest sync = (SyncGroupRequest) body;
+                return sync.memberId().equals(consumerId) &&
+                    sync.generationId() == 1 &&
+                    sync.groupAssignment().isEmpty();
+            }
+        }, syncGroupResponse(singletonList(t1p), Errors.NONE));
+
+        partitionAssignor.prepare(singletonMap(consumerId, 
singletonList(t1p)));
+
+        // This will trigger rebalance.
+        coordinator.poll(Long.MAX_VALUE);
+
+        // Make sure that the metadata was refreshed during the rebalance and 
thus subscriptions now contain two topics.
+        final Set<String> updatedSubscriptionSet = new 
HashSet<>(Arrays.asList(topic1, topic2));
+        assertEquals(updatedSubscriptionSet, subscriptions.subscription());
+
+        // Refresh the metadata again. Since there have been no changes since 
the last refresh, it won't trigger
+        // rebalance again.
+        metadata.requestUpdate();
+        client.poll(Long.MAX_VALUE, time.milliseconds());
+        assertFalse(coordinator.rejoinNeededOrPending());
+    }
+
+    @Test
     public void testWakeupDuringJoin() {
         final String consumerId = "leader";
 

Reply via email to