This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a932520  KAFKA-7126; Reduce number of rebalance for large consumer 
group after a topic is created
a932520 is described below

commit a932520135d42c7d9731064d96c21ab2fc5de696
Author: Jon Lee <[email protected]>
AuthorDate: Thu Jul 26 10:28:07 2018 -0700

    KAFKA-7126; Reduce number of rebalance for large consumer group after a 
topic is created
    
    This patch forces metadata update for consumers with pattern subscription 
at the beginning of rebalance (retry.backoff.ms is respected). This is to 
prevent such consumers from detecting subscription changes (e.g., new topic 
creation) independently and triggering multiple unnecessary rebalances. 
KAFKA-7126 contains detailed scenarios and rationale.
    
    Author: Jon Lee <[email protected]>
    
    Reviewers: Jason Gustafson <[email protected]>, Ted Yu 
<[email protected]>, Dong Lin <[email protected]>
    
    Closes #5408 from jonlee2/KAFKA-7126
---
 .../java/org/apache/kafka/clients/Metadata.java    | 16 +++++++-
 .../consumer/internals/ConsumerCoordinator.java    | 10 +++++
 .../internals/ConsumerCoordinatorTest.java         | 44 ++++++++++++++++++++++
 3 files changed, 68 insertions(+), 2 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java 
b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 6c663cf..1413eac 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -125,14 +125,26 @@ public final class Metadata implements Closeable {
     }
 
     /**
+     * Return the next time when the current cluster info can be updated 
(i.e., backoff time has elapsed).
+     *
+     * @param nowMs current time in ms
+     * @return remaining time in ms till the cluster info can be updated again
+     */
+    public synchronized long timeToAllowUpdate(long nowMs) {
+        return Math.max(this.lastRefreshMs + this.refreshBackoffMs - nowMs, 0);
+    }
+
+    /**
      * The next time to update the cluster info is the maximum of the time the 
current info will expire and the time the
      * current info can be updated (i.e. backoff time has elapsed); If an 
update has been request then the expiry time
      * is now
+     *
+     * @param nowMs current time in ms
+     * @return remaining time in ms till updating the cluster info
      */
     public synchronized long timeToNextUpdate(long nowMs) {
         long timeToExpire = needUpdate ? 0 : 
Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0);
-        long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - 
nowMs;
-        return Math.max(timeToExpire, timeToAllowUpdate);
+        return Math.max(timeToExpire, timeToAllowUpdate(nowMs));
     }
 
     /**
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index f9b77e9..8f25d6e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -328,6 +328,16 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
                 // we need to ensure that the metadata is fresh before joining 
initially. This ensures
                 // that we have matched the pattern against the cluster's 
topics at least once before joining.
                 if (subscriptions.hasPatternSubscription()) {
+                    // For consumer group that uses pattern-based 
subscription, after a topic is created,
+                    // any consumer that discovers the topic after metadata 
refresh can trigger rebalance
+                    // across the entire consumer group. Multiple rebalances 
can be triggered after one topic
+                    // creation if consumers refresh metadata at vastly 
different times. We can significantly
+                    // reduce the number of rebalances caused by single topic 
creation by asking consumer to
+                    // refresh metadata before re-joining the group as long as 
the refresh backoff time has
+                    // passed.
+                    if (this.metadata.timeToAllowUpdate(currentTime) == 0) {
+                        this.metadata.requestUpdate();
+                    }
                     if 
(!client.ensureFreshMetadata(remainingTimeAtLeastZero(timeoutMs, elapsed))) {
                         return false;
                     }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index ba392c6..cec56b0 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -514,6 +514,50 @@ public class ConsumerCoordinatorTest {
     }
 
     @Test
+    public void 
testForceMetadataRefreshForPatternSubscriptionDuringRebalance() {
+        // Set up a non-leader consumer with pattern subscription and a 
cluster containing one topic matching the
+        // pattern.
+        final String consumerId = "consumer";
+
+        subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
+        metadata.update(TestUtils.singletonCluster(topic1, 1), 
Collections.<String>emptySet(),
+            time.milliseconds());
+        assertEquals(singleton(topic1), subscriptions.subscription());
+
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
+
+        // Instrument the test so that metadata will contain two topics after 
next refresh.
+        client.prepareMetadataUpdate(cluster, Collections.emptySet());
+
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, 
"leader", Errors.NONE));
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(AbstractRequest body) {
+                SyncGroupRequest sync = (SyncGroupRequest) body;
+                return sync.memberId().equals(consumerId) &&
+                    sync.generationId() == 1 &&
+                    sync.groupAssignment().isEmpty();
+            }
+        }, syncGroupResponse(singletonList(t1p), Errors.NONE));
+
+        partitionAssignor.prepare(singletonMap(consumerId, 
singletonList(t1p)));
+
+        // This will trigger rebalance.
+        coordinator.poll(Long.MAX_VALUE);
+
+        // Make sure that the metadata was refreshed during the rebalance and 
thus subscriptions now contain two topics.
+        final Set<String> updatedSubscriptionSet = new 
HashSet<>(Arrays.asList(topic1, topic2));
+        assertEquals(updatedSubscriptionSet, subscriptions.subscription());
+
+        // Refresh the metadata again. Since there have been no changes since 
the last refresh, it won't trigger
+        // rebalance again.
+        metadata.requestUpdate();
+        client.poll(Long.MAX_VALUE, time.milliseconds());
+        assertFalse(coordinator.rejoinNeededOrPending());
+    }
+
+    @Test
     public void testWakeupDuringJoin() {
         final String consumerId = "leader";
 

Reply via email to