This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch 3.9 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push: new f670e9baa30 KAFKA-17632: Fix RoundRobinPartitioner for even partition counts (#17620) f670e9baa30 is described below commit f670e9baa307d1b0abde871de17153a00d1fdd0a Author: Dániel Urbán <urb.dani...@gmail.com> AuthorDate: Tue Nov 12 15:44:36 2024 +0100 KAFKA-17632: Fix RoundRobinPartitioner for even partition counts (#17620) RoundRobinPartitioner does not handle the fact that on new batch creation, the partition method is called twice. Reviewers: Viktor Somogyi-Vass <viktorsomo...@gmail.com>, Mickael Maison <mickael.mai...@gmail.com> --- .../kafka/clients/producer/KafkaProducer.java | 2 + .../apache/kafka/clients/producer/Partitioner.java | 3 ++ .../clients/producer/RoundRobinPartitioner.java | 17 ++++++++- .../producer/RoundRobinPartitionerTest.java | 44 +++++++++++++++++++++- 4 files changed, 64 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index d0aa37ec7f0..2e11f6d81f9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -1074,6 +1074,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> { if (result.abortForNewBatch) { int prevPartition = partition; + // IMPORTANT NOTE: the following onNewBatch and partition calls should not interrupted to allow + // the custom partitioner to correctly track its state onNewBatch(record.topic(), cluster, prevPartition); partition = partition(record, serializedKey, serializedValue, cluster); if (log.isTraceEnabled()) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java index bcfcb2db646..3db3c3a31eb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java @@ -49,6 +49,9 @@ public interface Partitioner extends Configurable, Closeable { * <p> * Notifies the partitioner a new batch is about to be created. When using the sticky partitioner, * this method can change the chosen sticky partition for the new batch. + * <p> + * After onNewBatch, the {@link #partition(String, Object, byte[], Object, byte[], Cluster)} method is called again + * which allows the implementation to "redirect" the message on new batch creation. * @param topic The topic name * @param cluster The current cluster metadata * @param prevPartition The partition previously selected for the record that triggered a new batch diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/RoundRobinPartitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/RoundRobinPartitioner.java index be2bc24a509..1ad55fe8cda 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/RoundRobinPartitioner.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/RoundRobinPartitioner.java @@ -18,6 +18,7 @@ package org.apache.kafka.clients.producer; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.Utils; import java.util.List; @@ -36,6 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger; */ public class RoundRobinPartitioner implements Partitioner { private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>(); + private final ThreadLocal<TopicPartition> previousPartition = new ThreadLocal<>(); public void configure(Map<String, ?> configs) {} @@ -51,6 +53,14 @@ public class RoundRobinPartitioner implements Partitioner { */ @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { + TopicPartition prevPartition = previousPartition.get(); + if (prevPartition != null) { + previousPartition.remove(); + if (topic.equals(prevPartition.topic())) { + return prevPartition.partition(); + } + } + int nextValue = nextValue(topic); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (!availablePartitions.isEmpty()) { @@ -68,6 +78,11 @@ public class RoundRobinPartitioner implements Partitioner { return counter.getAndIncrement(); } - public void close() {} + @SuppressWarnings("deprecation") + @Override + public void onNewBatch(String topic, Cluster cluster, int prevPartition) { + previousPartition.set(new TopicPartition(topic, prevPartition)); + } + public void close() {} } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RoundRobinPartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RoundRobinPartitionerTest.java index 37f35b0a5a3..33af9af1643 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/RoundRobinPartitionerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/RoundRobinPartitionerTest.java @@ -97,6 +97,7 @@ public class RoundRobinPartitionerTest { assertEquals(10, partitionCount.get(2).intValue()); } + @SuppressWarnings("deprecation") @Test public void testRoundRobinWithNullKeyBytes() { final String topicA = "topicA"; @@ -113,6 +114,10 @@ public class RoundRobinPartitionerTest { Partitioner partitioner = new RoundRobinPartitioner(); for (int i = 0; i < 30; ++i) { int partition = partitioner.partition(topicA, null, null, null, null, testCluster); + // Simulate single-message batches + partitioner.onNewBatch(topicA, testCluster, partition); + int nextPartition = partitioner.partition(topicA, null, null, null, null, testCluster); + assertEquals(partition, nextPartition, "New batch creation should not affect the partition selection"); Integer count = partitionCount.get(partition); if (null == count) count = 0; @@ -126,5 +131,42 @@ public class RoundRobinPartitionerTest { assertEquals(10, partitionCount.get(0).intValue()); assertEquals(10, partitionCount.get(1).intValue()); assertEquals(10, partitionCount.get(2).intValue()); - } + } + + @SuppressWarnings("deprecation") + @Test + public void testRoundRobinWithNullKeyBytesAndEvenPartitionCount() { + final String topicA = "topicA"; + final String topicB = "topicB"; + + List<PartitionInfo> allPartitions = asList(new PartitionInfo(topicA, 0, NODES[0], NODES, NODES), + new PartitionInfo(topicA, 1, NODES[1], NODES, NODES), new PartitionInfo(topicA, 2, NODES[2], NODES, NODES), + new PartitionInfo(topicB, 0, NODES[0], NODES, NODES), new PartitionInfo(topicA, 3, NODES[0], NODES, NODES)); + Cluster testCluster = new Cluster("clusterId", asList(NODES[0], NODES[1], NODES[2]), allPartitions, + Collections.emptySet(), Collections.emptySet()); + + final Map<Integer, Integer> partitionCount = new HashMap<>(); + + Partitioner partitioner = new RoundRobinPartitioner(); + for (int i = 0; i < 40; ++i) { + int partition = partitioner.partition(topicA, null, null, null, null, testCluster); + // Simulate single-message batches + partitioner.onNewBatch(topicA, testCluster, partition); + int nextPartition = partitioner.partition(topicA, null, null, null, null, testCluster); + assertEquals(partition, nextPartition, "New batch creation should not affect the partition selection"); + Integer count = partitionCount.get(partition); + if (null == count) + count = 0; + partitionCount.put(partition, count + 1); + + if (i % 5 == 0) { + partitioner.partition(topicB, null, null, null, null, testCluster); + } + } + + assertEquals(10, partitionCount.get(0).intValue()); + assertEquals(10, partitionCount.get(1).intValue()); + assertEquals(10, partitionCount.get(2).intValue()); + assertEquals(10, partitionCount.get(3).intValue()); + } }