This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 1b0c10d2bf0d9da95dfea230d8d839e5f3769006
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Wed Feb 12 11:45:06 2020 -0800

    MINOR: Fix unnecessary metadata fetch before group assignment (#8095)
    
    The recent increase in the flakiness of one of the offset reset tests 
(KAFKA-9538) traces back to https://github.com/apache/kafka/pull/7941. After 
investigation, we found that following this patch, the consumer was sending an 
additional metadata request prior to performing the group assignment. This 
slight timing difference was enough to trigger the test failures. The problem 
turned out to be due to a bug in `SubscriptionState.groupSubscribe`, which no 
longer counted the local subscri [...]
    
    Without the fix, we saw 30-50% test failures locally. With it, I could no 
longer reproduce the failure. However, #6561 is probably still needed to 
improve the resilience of this test.
    
    Reviewers: Rajini Sivaram <rajinisiva...@googlemail.com>
---
 .../consumer/ConsumerPartitionAssignor.java        |  8 ++++++
 .../consumer/internals/ConsumerCoordinator.java    |  3 --
 .../consumer/internals/SubscriptionState.java      | 12 ++++----
 .../consumer/internals/SubscriptionStateTest.java  | 20 +++++++++++--
 .../kafka/admin/ConsumerGroupCommandTest.scala     | 14 +++++----
 .../kafka/admin/ResetConsumerGroupOffsetTest.scala | 33 ++++++++++++++++------
 6 files changed, 67 insertions(+), 23 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
index f9a4217..8708ea4 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
@@ -154,6 +154,14 @@ public interface ConsumerPartitionAssignor {
         public ByteBuffer userData() {
             return userData;
         }
+
+        @Override
+        public String toString() {
+            return "Assignment(" +
+                    "partitions=" + partitions +
+                    (userData == null ? "" : ", userDataSize=" + 
userData.remaining()) +
+                    ')';
+        }
     }
 
     final class GroupSubscription {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index db8b255..b4d9fa5 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -1320,9 +1320,6 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
             return version == other.version || 
partitionsPerTopic.equals(other.partitionsPerTopic);
         }
 
-        Map<String, Integer> partitionsPerTopic() {
-            return partitionsPerTopic;
-        }
     }
 
     private static class OffsetCommitCompletion {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 4a3d15b..cdf358e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -187,15 +187,17 @@ public class SubscriptionState {
     }
 
     /**
-     * Add topics to the current group subscription. This is used by the group 
leader to ensure
+     * Set the current group subscription. This is used by the group leader to 
ensure
      * that it receives metadata updates for all topics that the group is 
interested in.
-     * @param topics The topics to add to the group subscription
+     *
+     * @param topics All topics from the group subscription
+     * @return true if the group subscription contains topics which are not 
part of the local subscription
      */
     synchronized boolean groupSubscribe(Collection<String> topics) {
         if (!partitionsAutoAssigned())
             throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE);
-        groupSubscription = new HashSet<>(groupSubscription);
-        return groupSubscription.addAll(topics);
+        groupSubscription = new HashSet<>(topics);
+        return !subscription.containsAll(groupSubscription);
     }
 
     /**
@@ -326,7 +328,7 @@ public class SubscriptionState {
     }
 
     /**
-     * Get the subcription topics for which metadata is required . For the 
leader, this will include
+     * Get the subscription topics for which metadata is required. For the 
leader, this will include
      * the union of the subscriptions of all group members. For followers, it 
is just that member's
      * subscription. This is used when querying topic metadata to detect the 
metadata changes which would
      * require rebalancing. The leader fetches metadata for all topics in the 
group so that it
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index c3ce02a..47d654e 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -106,13 +106,29 @@ public class SubscriptionStateTest {
     }
 
     @Test
+    public void testGroupSubscribe() {
+        state.subscribe(singleton(topic1), rebalanceListener);
+        assertEquals(singleton(topic1), state.metadataTopics());
+
+        assertFalse(state.groupSubscribe(singleton(topic1)));
+        assertEquals(singleton(topic1), state.metadataTopics());
+
+        assertTrue(state.groupSubscribe(Utils.mkSet(topic, topic1)));
+        assertEquals(Utils.mkSet(topic, topic1), state.metadataTopics());
+
+        // `groupSubscribe` does not accumulate
+        assertFalse(state.groupSubscribe(singleton(topic1)));
+        assertEquals(singleton(topic1), state.metadataTopics());
+    }
+
+    @Test
     public void partitionAssignmentChangeOnPatternSubscription() {
         state.subscribe(Pattern.compile(".*"), rebalanceListener);
         // assigned partitions should remain unchanged
         assertTrue(state.assignedPartitions().isEmpty());
         assertEquals(0, state.numAssignedPartitions());
 
-        state.subscribeFromPattern(new 
HashSet<>(Collections.singletonList(topic)));
+        state.subscribeFromPattern(Collections.singleton(topic));
         // assigned partitions should remain unchanged
         assertTrue(state.assignedPartitions().isEmpty());
         assertEquals(0, state.numAssignedPartitions());
@@ -244,7 +260,7 @@ public class SubscriptionStateTest {
     @Test
     public void cantAssignPartitionForUnmatchedPattern() {
         state.subscribe(Pattern.compile(".*t"), rebalanceListener);
-        state.subscribeFromPattern(new 
HashSet<>(Collections.singletonList(topic)));
+        state.subscribeFromPattern(Collections.singleton(topic));
         
assertFalse(state.checkAssignmentMatchedSubscription(Collections.singletonList(t1p0)));
     }
 
diff --git 
a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala 
b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
index 97b638f..853b2ca 100644
--- a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
@@ -65,20 +65,24 @@ class ConsumerGroupCommandTest extends 
KafkaServerTestHarness {
   }
 
   def committedOffsets(topic: String = topic, group: String = group): 
Map[TopicPartition, Long] = {
-    val props = new Properties
-    props.put("bootstrap.servers", brokerList)
-    props.put("group.id", group)
-    val consumer = new KafkaConsumer(props, new StringDeserializer, new 
StringDeserializer)
+    val consumer = createNoAutoCommitConsumer(group)
     try {
       val partitions: Set[TopicPartition] = consumer.partitionsFor(topic)
         .asScala.toSet.map {partitionInfo : PartitionInfo => new 
TopicPartition(partitionInfo.topic, partitionInfo.partition)}
-
       consumer.committed(partitions.asJava).asScala.filter(_._2 != 
null).mapValues(_.offset()).toMap
     } finally {
       consumer.close()
     }
   }
 
+  def createNoAutoCommitConsumer(group: String): KafkaConsumer[String, String] 
= {
+    val props = new Properties
+    props.put("bootstrap.servers", brokerList)
+    props.put("group.id", group)
+    props.put("enable.auto.commit", "false")
+    new KafkaConsumer(props, new StringDeserializer, new StringDeserializer)
+  }
+
   def getConsumerGroupService(args: Array[String]): ConsumerGroupService = {
     val opts = new ConsumerGroupCommandOptions(args)
     val service = new ConsumerGroupService(opts, 
Map(AdminClientConfig.RETRIES_CONFIG -> Int.MaxValue.toString))
diff --git 
a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala 
b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
index 838444c..f9322b3 100644
--- a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
@@ -16,8 +16,6 @@ import java.io.{BufferedWriter, File, FileWriter}
 import java.text.{ParseException, SimpleDateFormat}
 import java.util.{Calendar, Date, Properties}
 
-import scala.collection.Seq
-
 import joptsimple.OptionException
 import kafka.admin.ConsumerGroupCommand.ConsumerGroupService
 import kafka.server.KafkaConfig
@@ -28,6 +26,9 @@ import org.apache.kafka.test
 import org.junit.Assert._
 import org.junit.Test
 
+import scala.collection.JavaConverters._
+import scala.collection.Seq
+
 class TimeConversionTests {
 
   @Test
@@ -462,12 +463,28 @@ class ResetConsumerGroupOffsetTest extends 
ConsumerGroupCommandTest {
     executor.shutdown()
   }
 
-  private def awaitConsumerProgress(topic: String = topic, group: String = 
group, count: Long): Unit = {
-    TestUtils.waitUntilTrue(() => {
-      val offsets = committedOffsets(topic = topic, group = group).values
-      count == offsets.sum
-    }, "Expected that consumer group has consumed all messages from 
topic/partition. " +
-      s"Expected offset: $count. Actual offset: ${committedOffsets(topic, 
group).values.sum}")
+  private def awaitConsumerProgress(topic: String = topic,
+                                    group: String = group,
+                                    count: Long): Unit = {
+    val consumer = createNoAutoCommitConsumer(group)
+    try {
+      val partitions = consumer.partitionsFor(topic).asScala.map { 
partitionInfo =>
+        new TopicPartition(partitionInfo.topic, partitionInfo.partition)
+      }.toSet
+
+      TestUtils.waitUntilTrue(() => {
+        val committed = consumer.committed(partitions.asJava).values.asScala
+        val total = committed.foldLeft(0L) { case (currentSum, 
offsetAndMetadata) =>
+          currentSum + Option(offsetAndMetadata).map(_.offset).getOrElse(0L)
+        }
+        total == count
+      }, "Expected that consumer group has consumed all messages from 
topic/partition. " +
+        s"Expected offset: $count. Actual offset: ${committedOffsets(topic, 
group).values.sum}")
+
+    } finally {
+      consumer.close()
+    }
+
   }
 
   private def resetAndAssertOffsets(args: Array[String],

Reply via email to