This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 697f156  MINOR: Fix unnecessary metadata fetch before group assignment 
(#8095)
697f156 is described below

commit 697f1567bef5709028c24ab14fdf85f0e26551d4
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Wed Feb 12 11:45:06 2020 -0800

    MINOR: Fix unnecessary metadata fetch before group assignment (#8095)
    
    The recent increase in the flakiness of one of the offset reset tests 
(KAFKA-9538) traces back to https://github.com/apache/kafka/pull/7941. After 
investigation, we found that following this patch, the consumer was sending an 
additional metadata request prior to performing the group assignment. This 
slight timing difference was enough to trigger the test failures. The problem 
turned out to be due to a bug in `SubscriptionState.groupSubscribe`, which no 
longer counted the local subscri [...]
    
    Without the fix, we saw 30-50% test failures locally. With it, I could no 
longer reproduce the failure. However, #6561 is probably still needed to 
improve the resilience of this test.
    
    Reviewers: Rajini Sivaram <rajinisiva...@googlemail.com>
---
 .../consumer/internals/SubscriptionState.java      | 12 ++++++-----
 .../consumer/internals/SubscriptionStateTest.java  | 20 ++++++++++++++++--
 .../kafka/admin/ConsumerGroupCommandTest.scala     | 16 ++++++++++-----
 .../kafka/admin/ResetConsumerGroupOffsetTest.scala | 24 ++++++++++++++++------
 4 files changed, 54 insertions(+), 18 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index f4f4d08..89712d9 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -160,15 +160,17 @@ public class SubscriptionState {
     }
 
     /**
-     * Add topics to the current group subscription. This is used by the group 
leader to ensure
+     * Set the current group subscription. This is used by the group leader to 
ensure
      * that it receives metadata updates for all topics that the group is 
interested in.
-     * @param topics The topics to add to the group subscription
+     *
+     * @param topics All topics from the group subscription
+     * @return true if the group subscription contains topics which are not 
part of the local subscription
      */
     synchronized boolean groupSubscribe(Collection<String> topics) {
         if (!partitionsAutoAssigned())
             throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE);
-        groupSubscription = new HashSet<>(groupSubscription);
-        return groupSubscription.addAll(topics);
+        groupSubscription = new HashSet<>(topics);
+        return !subscription.containsAll(groupSubscription);
     }
 
     /**
@@ -293,7 +295,7 @@ public class SubscriptionState {
     }
 
     /**
-     * Get the subcription topics for which metadata is required . For the 
leader, this will include
+     * Get the subscription topics for which metadata is required. For the 
leader, this will include
      * the union of the subscriptions of all group members. For followers, it 
is just that member's
      * subscription. This is used when querying topic metadata to detect the 
metadata changes which would
      * require rebalancing. The leader fetches metadata for all topics in the 
group so that it
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index 217b3ce..35ef154 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -105,13 +105,29 @@ public class SubscriptionStateTest {
     }
 
     @Test
+    public void testGroupSubscribe() {
+        state.subscribe(singleton(topic1), rebalanceListener);
+        assertEquals(singleton(topic1), state.metadataTopics());
+
+        assertFalse(state.groupSubscribe(singleton(topic1)));
+        assertEquals(singleton(topic1), state.metadataTopics());
+
+        assertTrue(state.groupSubscribe(Utils.mkSet(topic, topic1)));
+        assertEquals(Utils.mkSet(topic, topic1), state.metadataTopics());
+
+        // `groupSubscribe` does not accumulate
+        assertFalse(state.groupSubscribe(singleton(topic1)));
+        assertEquals(singleton(topic1), state.metadataTopics());
+    }
+
+    @Test
     public void partitionAssignmentChangeOnPatternSubscription() {
         state.subscribe(Pattern.compile(".*"), rebalanceListener);
         // assigned partitions should remain unchanged
         assertTrue(state.assignedPartitions().isEmpty());
         assertEquals(0, state.numAssignedPartitions());
 
-        state.subscribeFromPattern(new 
HashSet<>(Collections.singletonList(topic)));
+        state.subscribeFromPattern(Collections.singleton(topic));
         // assigned partitions should remain unchanged
         assertTrue(state.assignedPartitions().isEmpty());
         assertEquals(0, state.numAssignedPartitions());
@@ -230,7 +246,7 @@ public class SubscriptionStateTest {
     @Test
     public void cantAssignPartitionForUnmatchedPattern() {
         state.subscribe(Pattern.compile(".*t"), rebalanceListener);
-        state.subscribeFromPattern(new 
HashSet<>(Collections.singletonList(topic)));
+        state.subscribeFromPattern(Collections.singleton(topic));
         
assertFalse(state.assignFromSubscribed(Collections.singletonList(t1p0)));
     }
 
diff --git 
a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala 
b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
index d5eea98..c830ccd 100644
--- a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
@@ -64,12 +64,10 @@ class ConsumerGroupCommandTest extends 
KafkaServerTestHarness {
   }
 
   def committedOffsets(topic: String = topic, group: String = group): 
Map[TopicPartition, Long] = {
-    val props = new Properties
-    props.put("bootstrap.servers", brokerList)
-    props.put("group.id", group)
-    val consumer = new KafkaConsumer(props, new StringDeserializer, new 
StringDeserializer)
+    val consumer = createNoAutoCommitConsumer(group)
     try {
-      consumer.partitionsFor(topic).asScala.flatMap { partitionInfo =>
+      val partitions = consumer.partitionsFor(topic).asScala.toSet
+      partitions.flatMap { partitionInfo =>
         val tp = new TopicPartition(partitionInfo.topic, 
partitionInfo.partition)
         val committed = consumer.committed(tp)
         if (committed == null)
@@ -82,6 +80,14 @@ class ConsumerGroupCommandTest extends 
KafkaServerTestHarness {
     }
   }
 
+  def createNoAutoCommitConsumer(group: String): KafkaConsumer[String, String] 
= {
+    val props = new Properties
+    props.put("bootstrap.servers", brokerList)
+    props.put("group.id", group)
+    props.put("enable.auto.commit", "false")
+    new KafkaConsumer(props, new StringDeserializer, new StringDeserializer)
+  }
+
   def getConsumerGroupService(args: Array[String]): ConsumerGroupService = {
     val opts = new ConsumerGroupCommandOptions(args)
     val service = new ConsumerGroupService(opts)
diff --git 
a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala 
b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
index baf1d05..d69ec9d 100644
--- a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
@@ -25,6 +25,8 @@ import org.apache.kafka.common.TopicPartition
 import org.junit.Assert._
 import org.junit.Test
 
+import scala.collection.Seq
+
 class TimeConversionTests {
 
   @Test
@@ -466,12 +468,22 @@ class ResetConsumerGroupOffsetTest extends 
ConsumerGroupCommandTest {
     executor.shutdown()
   }
 
-  private def awaitConsumerProgress(topic: String = topic, group: String = 
group, count: Long): Unit = {
-    TestUtils.waitUntilTrue(() => {
-      val offsets = committedOffsets(topic = topic, group = group).values
-      count == offsets.sum
-    }, "Expected that consumer group has consumed all messages from 
topic/partition. " +
-      s"Expected offset: $count. Actual offset: ${committedOffsets(topic, 
group).values.sum}")
+  private def awaitConsumerProgress(topic: String = topic,
+                                    group: String = group,
+                                    count: Long): Unit = {
+    val consumer = createNoAutoCommitConsumer(group)
+    try {
+      TestUtils.waitUntilTrue(() => {
+        val committed = committedOffsets(topic, group)
+        val total = committed.values.sum
+        total == count
+      }, "Expected that consumer group has consumed all messages from 
topic/partition. " +
+        s"Expected offset: $count. Actual offset: ${committedOffsets(topic, 
group).values.sum}")
+
+    } finally {
+      consumer.close()
+    }
+
   }
 
   private def resetAndAssertOffsets(args: Array[String],

Reply via email to