This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new ef3cd68 KAFKA-12898; Owned partitions in the subscription must be
sorted (#10878)
ef3cd68 is described below
commit ef3cd68c852daf21d8e207fa8e21c8770f4041d7
Author: David Jacot <[email protected]>
AuthorDate: Thu Jun 24 09:56:27 2021 +0200
KAFKA-12898; Owned partitions in the subscription must be sorted (#10878)
The group coordinator compares the provided subscription with the store
subscription based on their bytes representation. So if the subscribed
partitions are not in the same order, the group coordinator would consider that
they are different and rebalance the group. This patch ensures that the topics
and the owned partitions are sorted.
(cherry-picked from 386670227b2e5ef7633c74033e2b99222b35d7cc)
Reviewers: Anna Sophie Blee-Goldman <[email protected]>
---
.../consumer/internals/ConsumerProtocol.java | 25 ++++++++++----
.../consumer/internals/ConsumerProtocolTest.java | 40 +++++++++++++++++-----
2 files changed, 50 insertions(+), 15 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
index a05e871..d0a1c1c 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.clients.consumer.internals;
-import java.nio.BufferUnderflowException;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
import
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
import org.apache.kafka.common.TopicPartition;
@@ -28,7 +27,10 @@ import
org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.utils.CollectionUtils;
import java.nio.ByteBuffer;
+import java.nio.BufferUnderflowException;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
@@ -72,13 +74,22 @@ public class ConsumerProtocol {
version = checkSubscriptionVersion(version);
ConsumerProtocolSubscription data = new ConsumerProtocolSubscription();
- data.setTopics(subscription.topics());
+
+ List<String> topics = new ArrayList<>(subscription.topics());
+ Collections.sort(topics);
+ data.setTopics(topics);
+
data.setUserData(subscription.userData() != null ?
subscription.userData().duplicate() : null);
- Map<String, List<Integer>> partitionsByTopic =
CollectionUtils.groupPartitionsByTopic(subscription.ownedPartitions());
- for (Map.Entry<String, List<Integer>> topicEntry :
partitionsByTopic.entrySet()) {
- data.ownedPartitions().add(new
ConsumerProtocolSubscription.TopicPartition()
- .setTopic(topicEntry.getKey())
- .setPartitions(topicEntry.getValue()));
+
+ List<TopicPartition> ownedPartitions = new
ArrayList<>(subscription.ownedPartitions());
+
ownedPartitions.sort(Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
+ ConsumerProtocolSubscription.TopicPartition partition = null;
+ for (TopicPartition tp : ownedPartitions) {
+ if (partition == null || !partition.topic().equals(tp.topic())) {
+ partition = new
ConsumerProtocolSubscription.TopicPartition().setTopic(tp.topic());
+ data.ownedPartitions().add(partition);
+ }
+ partition.partitions().add(tp.partition());
}
return MessageUtil.toVersionPrefixedByteBuffer(version, data);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
index 35a4a6f..a2d5120 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
@@ -58,7 +58,7 @@ public class ConsumerProtocolTest {
ByteBuffer buffer =
ConsumerProtocol.serializeSubscription(subscription, version);
Subscription parsedSubscription =
ConsumerProtocol.deserializeSubscription(buffer);
- assertEquals(subscription.topics(), parsedSubscription.topics());
+ assertEquals(toSet(subscription.topics()),
toSet(parsedSubscription.topics()));
assertEquals(subscription.userData(),
parsedSubscription.userData());
assertFalse(parsedSubscription.groupInstanceId().isPresent());
@@ -75,7 +75,7 @@ public class ConsumerProtocolTest {
Subscription subscription = new Subscription(Arrays.asList("foo",
"bar"), ByteBuffer.wrap(new byte[0]));
ByteBuffer buffer =
ConsumerProtocol.serializeSubscription(subscription);
Subscription parsedSubscription =
ConsumerProtocol.deserializeSubscription(buffer);
- assertEquals(subscription.topics(), parsedSubscription.topics());
+ assertEquals(toSet(subscription.topics()),
toSet(parsedSubscription.topics()));
assertEquals(0, parsedSubscription.userData().limit());
assertFalse(parsedSubscription.groupInstanceId().isPresent());
}
@@ -87,7 +87,7 @@ public class ConsumerProtocolTest {
Subscription parsedSubscription =
ConsumerProtocol.deserializeSubscription(buffer);
parsedSubscription.setGroupInstanceId(groupInstanceId);
- assertEquals(subscription.topics(), parsedSubscription.topics());
+ assertEquals(toSet(subscription.topics()),
toSet(parsedSubscription.topics()));
assertEquals(0, parsedSubscription.userData().limit());
assertEquals(groupInstanceId, parsedSubscription.groupInstanceId());
}
@@ -97,16 +97,40 @@ public class ConsumerProtocolTest {
Subscription subscription = new Subscription(Arrays.asList("foo",
"bar"), null);
ByteBuffer buffer =
ConsumerProtocol.serializeSubscription(subscription);
Subscription parsedSubscription =
ConsumerProtocol.deserializeSubscription(buffer);
- assertEquals(subscription.topics(), parsedSubscription.topics());
+ assertEquals(toSet(subscription.topics()),
toSet(parsedSubscription.topics()));
assertNull(parsedSubscription.userData());
}
@Test
+ public void serializeSubscriptionShouldOrderTopics() {
+ assertEquals(
+ ConsumerProtocol.serializeSubscription(
+ new Subscription(Arrays.asList("foo", "bar"), null,
Arrays.asList(tp1, tp2))
+ ),
+ ConsumerProtocol.serializeSubscription(
+ new Subscription(Arrays.asList("bar", "foo"), null,
Arrays.asList(tp1, tp2))
+ )
+ );
+ }
+
+ @Test
+ public void serializeSubscriptionShouldOrderOwnedPartitions() {
+ assertEquals(
+ ConsumerProtocol.serializeSubscription(
+ new Subscription(Arrays.asList("foo", "bar"), null,
Arrays.asList(tp1, tp2))
+ ),
+ ConsumerProtocol.serializeSubscription(
+ new Subscription(Arrays.asList("foo", "bar"), null,
Arrays.asList(tp2, tp1))
+ )
+ );
+ }
+
+ @Test
public void deserializeOldSubscriptionVersion() {
Subscription subscription = new Subscription(Arrays.asList("foo",
"bar"), null);
ByteBuffer buffer =
ConsumerProtocol.serializeSubscription(subscription, (short) 0);
Subscription parsedSubscription =
ConsumerProtocol.deserializeSubscription(buffer);
- assertEquals(parsedSubscription.topics(), parsedSubscription.topics());
+ assertEquals(toSet(parsedSubscription.topics()),
toSet(parsedSubscription.topics()));
assertNull(parsedSubscription.userData());
assertTrue(parsedSubscription.ownedPartitions().isEmpty());
}
@@ -118,7 +142,7 @@ public class ConsumerProtocolTest {
// ignore the version assuming it is the old byte code, as it will
blindly deserialize as V0
ConsumerProtocol.deserializeVersion(buffer);
Subscription parsedSubscription =
ConsumerProtocol.deserializeSubscription(buffer, (short) 0);
- assertEquals(subscription.topics(), parsedSubscription.topics());
+ assertEquals(toSet(subscription.topics()),
toSet(parsedSubscription.topics()));
assertNull(parsedSubscription.userData());
assertTrue(parsedSubscription.ownedPartitions().isEmpty());
assertFalse(parsedSubscription.groupInstanceId().isPresent());
@@ -156,8 +180,8 @@ public class ConsumerProtocolTest {
Subscription subscription =
ConsumerProtocol.deserializeSubscription(buffer);
subscription.setGroupInstanceId(groupInstanceId);
- assertEquals(Collections.singletonList("topic"),
subscription.topics());
- assertEquals(Collections.singletonList(tp2),
subscription.ownedPartitions());
+ assertEquals(Collections.singleton("topic"),
toSet(subscription.topics()));
+ assertEquals(Collections.singleton(tp2),
toSet(subscription.ownedPartitions()));
assertEquals(groupInstanceId, subscription.groupInstanceId());
}