This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new ef3cd68  KAFKA-12898; Owned partitions in the subscription must be 
sorted (#10878)
ef3cd68 is described below

commit ef3cd68c852daf21d8e207fa8e21c8770f4041d7
Author: David Jacot <[email protected]>
AuthorDate: Thu Jun 24 09:56:27 2021 +0200

    KAFKA-12898; Owned partitions in the subscription must be sorted (#10878)
    
    The group coordinator compares the provided subscription with the store 
subscription based on their bytes representation. So if the subscribed 
partitions are not in the same order, the group coordinator would consider that 
they are different and rebalance the group. This patch ensures that the topics 
and the owned partitions are sorted.
    
    (cherry-picked from 386670227b2e5ef7633c74033e2b99222b35d7cc)
    
    Reviewers: Anna Sophie Blee-Goldman <[email protected]>
---
 .../consumer/internals/ConsumerProtocol.java       | 25 ++++++++++----
 .../consumer/internals/ConsumerProtocolTest.java   | 40 +++++++++++++++++-----
 2 files changed, 50 insertions(+), 15 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
index a05e871..d0a1c1c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
-import java.nio.BufferUnderflowException;
 import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
 import 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
 import org.apache.kafka.common.TopicPartition;
@@ -28,7 +27,10 @@ import 
org.apache.kafka.common.protocol.types.SchemaException;
 import org.apache.kafka.common.utils.CollectionUtils;
 
 import java.nio.ByteBuffer;
+import java.nio.BufferUnderflowException;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 
@@ -72,13 +74,22 @@ public class ConsumerProtocol {
         version = checkSubscriptionVersion(version);
 
         ConsumerProtocolSubscription data = new ConsumerProtocolSubscription();
-        data.setTopics(subscription.topics());
+
+        List<String> topics = new ArrayList<>(subscription.topics());
+        Collections.sort(topics);
+        data.setTopics(topics);
+
         data.setUserData(subscription.userData() != null ? 
subscription.userData().duplicate() : null);
-        Map<String, List<Integer>> partitionsByTopic = 
CollectionUtils.groupPartitionsByTopic(subscription.ownedPartitions());
-        for (Map.Entry<String, List<Integer>> topicEntry : 
partitionsByTopic.entrySet()) {
-            data.ownedPartitions().add(new 
ConsumerProtocolSubscription.TopicPartition()
-                .setTopic(topicEntry.getKey())
-                .setPartitions(topicEntry.getValue()));
+
+        List<TopicPartition> ownedPartitions = new 
ArrayList<>(subscription.ownedPartitions());
+        
ownedPartitions.sort(Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
+        ConsumerProtocolSubscription.TopicPartition partition = null;
+        for (TopicPartition tp : ownedPartitions) {
+            if (partition == null || !partition.topic().equals(tp.topic())) {
+                partition = new 
ConsumerProtocolSubscription.TopicPartition().setTopic(tp.topic());
+                data.ownedPartitions().add(partition);
+            }
+            partition.partitions().add(tp.partition());
         }
 
         return MessageUtil.toVersionPrefixedByteBuffer(version, data);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
index 35a4a6f..a2d5120 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
@@ -58,7 +58,7 @@ public class ConsumerProtocolTest {
             ByteBuffer buffer = 
ConsumerProtocol.serializeSubscription(subscription, version);
             Subscription parsedSubscription = 
ConsumerProtocol.deserializeSubscription(buffer);
 
-            assertEquals(subscription.topics(), parsedSubscription.topics());
+            assertEquals(toSet(subscription.topics()), 
toSet(parsedSubscription.topics()));
             assertEquals(subscription.userData(), 
parsedSubscription.userData());
             assertFalse(parsedSubscription.groupInstanceId().isPresent());
 
@@ -75,7 +75,7 @@ public class ConsumerProtocolTest {
         Subscription subscription = new Subscription(Arrays.asList("foo", 
"bar"), ByteBuffer.wrap(new byte[0]));
         ByteBuffer buffer = 
ConsumerProtocol.serializeSubscription(subscription);
         Subscription parsedSubscription = 
ConsumerProtocol.deserializeSubscription(buffer);
-        assertEquals(subscription.topics(), parsedSubscription.topics());
+        assertEquals(toSet(subscription.topics()), 
toSet(parsedSubscription.topics()));
         assertEquals(0, parsedSubscription.userData().limit());
         assertFalse(parsedSubscription.groupInstanceId().isPresent());
     }
@@ -87,7 +87,7 @@ public class ConsumerProtocolTest {
 
         Subscription parsedSubscription = 
ConsumerProtocol.deserializeSubscription(buffer);
         parsedSubscription.setGroupInstanceId(groupInstanceId);
-        assertEquals(subscription.topics(), parsedSubscription.topics());
+        assertEquals(toSet(subscription.topics()), 
toSet(parsedSubscription.topics()));
         assertEquals(0, parsedSubscription.userData().limit());
         assertEquals(groupInstanceId, parsedSubscription.groupInstanceId());
     }
@@ -97,16 +97,40 @@ public class ConsumerProtocolTest {
         Subscription subscription = new Subscription(Arrays.asList("foo", 
"bar"), null);
         ByteBuffer buffer = 
ConsumerProtocol.serializeSubscription(subscription);
         Subscription parsedSubscription = 
ConsumerProtocol.deserializeSubscription(buffer);
-        assertEquals(subscription.topics(), parsedSubscription.topics());
+        assertEquals(toSet(subscription.topics()), 
toSet(parsedSubscription.topics()));
         assertNull(parsedSubscription.userData());
     }
 
     @Test
+    public void serializeSubscriptionShouldOrderTopics() {
+        assertEquals(
+            ConsumerProtocol.serializeSubscription(
+                new Subscription(Arrays.asList("foo", "bar"), null, 
Arrays.asList(tp1, tp2))
+            ),
+            ConsumerProtocol.serializeSubscription(
+                new Subscription(Arrays.asList("bar", "foo"), null, 
Arrays.asList(tp1, tp2))
+            )
+        );
+    }
+
+    @Test
+    public void serializeSubscriptionShouldOrderOwnedPartitions() {
+        assertEquals(
+            ConsumerProtocol.serializeSubscription(
+                new Subscription(Arrays.asList("foo", "bar"), null, 
Arrays.asList(tp1, tp2))
+            ),
+            ConsumerProtocol.serializeSubscription(
+                new Subscription(Arrays.asList("foo", "bar"), null, 
Arrays.asList(tp2, tp1))
+            )
+        );
+    }
+
+    @Test
     public void deserializeOldSubscriptionVersion() {
         Subscription subscription = new Subscription(Arrays.asList("foo", 
"bar"), null);
         ByteBuffer buffer = 
ConsumerProtocol.serializeSubscription(subscription, (short) 0);
         Subscription parsedSubscription = 
ConsumerProtocol.deserializeSubscription(buffer);
-        assertEquals(parsedSubscription.topics(), parsedSubscription.topics());
+        assertEquals(toSet(parsedSubscription.topics()), 
toSet(parsedSubscription.topics()));
         assertNull(parsedSubscription.userData());
         assertTrue(parsedSubscription.ownedPartitions().isEmpty());
     }
@@ -118,7 +142,7 @@ public class ConsumerProtocolTest {
         // ignore the version assuming it is the old byte code, as it will 
blindly deserialize as V0
         ConsumerProtocol.deserializeVersion(buffer);
         Subscription parsedSubscription = 
ConsumerProtocol.deserializeSubscription(buffer, (short) 0);
-        assertEquals(subscription.topics(), parsedSubscription.topics());
+        assertEquals(toSet(subscription.topics()), 
toSet(parsedSubscription.topics()));
         assertNull(parsedSubscription.userData());
         assertTrue(parsedSubscription.ownedPartitions().isEmpty());
         assertFalse(parsedSubscription.groupInstanceId().isPresent());
@@ -156,8 +180,8 @@ public class ConsumerProtocolTest {
 
         Subscription subscription = 
ConsumerProtocol.deserializeSubscription(buffer);
         subscription.setGroupInstanceId(groupInstanceId);
-        assertEquals(Collections.singletonList("topic"), 
subscription.topics());
-        assertEquals(Collections.singletonList(tp2), 
subscription.ownedPartitions());
+        assertEquals(Collections.singleton("topic"), 
toSet(subscription.topics()));
+        assertEquals(Collections.singleton(tp2), 
toSet(subscription.ownedPartitions()));
         assertEquals(groupInstanceId, subscription.groupInstanceId());
     }
 

Reply via email to