This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.9 by this push:
     new f670e9baa30 KAFKA-17632: Fix RoundRobinPartitioner for even partition 
counts (#17620)
f670e9baa30 is described below

commit f670e9baa307d1b0abde871de17153a00d1fdd0a
Author: Dániel Urbán <urb.dani...@gmail.com>
AuthorDate: Tue Nov 12 15:44:36 2024 +0100

    KAFKA-17632: Fix RoundRobinPartitioner for even partition counts (#17620)
    
    RoundRobinPartitioner does not handle the fact that on new batch creation, 
the partition method is called twice.
    
    Reviewers: Viktor Somogyi-Vass <viktorsomo...@gmail.com>, Mickael Maison 
<mickael.mai...@gmail.com>
---
 .../kafka/clients/producer/KafkaProducer.java      |  2 +
 .../apache/kafka/clients/producer/Partitioner.java |  3 ++
 .../clients/producer/RoundRobinPartitioner.java    | 17 ++++++++-
 .../producer/RoundRobinPartitionerTest.java        | 44 +++++++++++++++++++++-
 4 files changed, 64 insertions(+), 2 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index d0aa37ec7f0..2e11f6d81f9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -1074,6 +1074,8 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
 
             if (result.abortForNewBatch) {
                 int prevPartition = partition;
+                // IMPORTANT NOTE: the following onNewBatch and partition 
calls should not interrupted to allow
+                // the custom partitioner to correctly track its state
                 onNewBatch(record.topic(), cluster, prevPartition);
                 partition = partition(record, serializedKey, serializedValue, 
cluster);
                 if (log.isTraceEnabled()) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java
index bcfcb2db646..3db3c3a31eb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java
@@ -49,6 +49,9 @@ public interface Partitioner extends Configurable, Closeable {
      * <p>
      * Notifies the partitioner a new batch is about to be created. When using 
the sticky partitioner,
      * this method can change the chosen sticky partition for the new batch.
+     * <p>
+     * After onNewBatch, the {@link #partition(String, Object, byte[], Object, 
byte[], Cluster)} method is called again
+     * which allows the implementation to "redirect" the message on new batch 
creation.
      * @param topic The topic name
      * @param cluster The current cluster metadata
      * @param prevPartition The partition previously selected for the record 
that triggered a new batch
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/RoundRobinPartitioner.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/RoundRobinPartitioner.java
index be2bc24a509..1ad55fe8cda 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/RoundRobinPartitioner.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/RoundRobinPartitioner.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.producer;
 
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.Utils;
 
 import java.util.List;
@@ -36,6 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 public class RoundRobinPartitioner implements Partitioner {
     private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new 
ConcurrentHashMap<>();
+    private final ThreadLocal<TopicPartition> previousPartition = new 
ThreadLocal<>();
 
     public void configure(Map<String, ?> configs) {}
 
@@ -51,6 +53,14 @@ public class RoundRobinPartitioner implements Partitioner {
      */
     @Override
     public int partition(String topic, Object key, byte[] keyBytes, Object 
value, byte[] valueBytes, Cluster cluster) {
+        TopicPartition prevPartition = previousPartition.get();
+        if (prevPartition != null) {
+            previousPartition.remove();
+            if (topic.equals(prevPartition.topic())) {
+                return prevPartition.partition();
+            }
+        }
+
         int nextValue = nextValue(topic);
         List<PartitionInfo> availablePartitions = 
cluster.availablePartitionsForTopic(topic);
         if (!availablePartitions.isEmpty()) {
@@ -68,6 +78,11 @@ public class RoundRobinPartitioner implements Partitioner {
         return counter.getAndIncrement();
     }
 
-    public void close() {}
+    @SuppressWarnings("deprecation")
+    @Override
+    public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
+        previousPartition.set(new TopicPartition(topic, prevPartition));
+    }
 
+    public void close() {}
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/RoundRobinPartitionerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/RoundRobinPartitionerTest.java
index 37f35b0a5a3..33af9af1643 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/RoundRobinPartitionerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/RoundRobinPartitionerTest.java
@@ -97,6 +97,7 @@ public class RoundRobinPartitionerTest {
         assertEquals(10, partitionCount.get(2).intValue());
     }
     
+    @SuppressWarnings("deprecation")
     @Test
     public void testRoundRobinWithNullKeyBytes() {
         final String topicA = "topicA";
@@ -113,6 +114,10 @@ public class RoundRobinPartitionerTest {
         Partitioner partitioner = new RoundRobinPartitioner();
         for (int i = 0; i < 30; ++i) {
             int partition = partitioner.partition(topicA, null, null, null, 
null, testCluster);
+            // Simulate single-message batches
+            partitioner.onNewBatch(topicA, testCluster, partition);
+            int nextPartition = partitioner.partition(topicA, null, null, 
null, null, testCluster);
+            assertEquals(partition, nextPartition, "New batch creation should 
not affect the partition selection");
             Integer count = partitionCount.get(partition);
             if (null == count)
                 count = 0;
@@ -126,5 +131,42 @@ public class RoundRobinPartitionerTest {
         assertEquals(10, partitionCount.get(0).intValue());
         assertEquals(10, partitionCount.get(1).intValue());
         assertEquals(10, partitionCount.get(2).intValue());
-    }    
+    }
+
+    @SuppressWarnings("deprecation")
+    @Test
+    public void testRoundRobinWithNullKeyBytesAndEvenPartitionCount() {
+        final String topicA = "topicA";
+        final String topicB = "topicB";
+
+        List<PartitionInfo> allPartitions = asList(new PartitionInfo(topicA, 
0, NODES[0], NODES, NODES),
+                new PartitionInfo(topicA, 1, NODES[1], NODES, NODES), new 
PartitionInfo(topicA, 2, NODES[2], NODES, NODES),
+                new PartitionInfo(topicB, 0, NODES[0], NODES, NODES), new 
PartitionInfo(topicA, 3, NODES[0], NODES, NODES));
+        Cluster testCluster = new Cluster("clusterId", asList(NODES[0], 
NODES[1], NODES[2]), allPartitions,
+                Collections.emptySet(), Collections.emptySet());
+
+        final Map<Integer, Integer> partitionCount = new HashMap<>();
+
+        Partitioner partitioner = new RoundRobinPartitioner();
+        for (int i = 0; i < 40; ++i) {
+            int partition = partitioner.partition(topicA, null, null, null, 
null, testCluster);
+            // Simulate single-message batches
+            partitioner.onNewBatch(topicA, testCluster, partition);
+            int nextPartition = partitioner.partition(topicA, null, null, 
null, null, testCluster);
+            assertEquals(partition, nextPartition, "New batch creation should 
not affect the partition selection");
+            Integer count = partitionCount.get(partition);
+            if (null == count)
+                count = 0;
+            partitionCount.put(partition, count + 1);
+
+            if (i % 5 == 0) {
+                partitioner.partition(topicB, null, null, null, null, 
testCluster);
+            }
+        }
+
+        assertEquals(10, partitionCount.get(0).intValue());
+        assertEquals(10, partitionCount.get(1).intValue());
+        assertEquals(10, partitionCount.get(2).intValue());
+        assertEquals(10, partitionCount.get(3).intValue());
+    }
 }

Reply via email to