This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cfdd567955 KAFKA-13880: Remove DefaultPartitioner from 
StreamPartitioner (#12304)
cfdd567955 is described below

commit cfdd567955588e134770a9145ba57800ca88313c
Author: Guozhang Wang <wangg...@gmail.com>
AuthorDate: Fri Jun 17 20:17:02 2022 -0700

    KAFKA-13880: Remove DefaultPartitioner from StreamPartitioner (#12304)
    
    There are some considerata embedded in this seemingly straight-forward PR 
that I'd like to explain here. The StreamPartitioner is used to send records to 
three types of topics:
    
    1) repartition topics, where key should never be null.
    2) changelog topics, where key should never be null.
    3) sink topics, where only non-windowed key could be null and windowed key 
should still never be null.
    Also, the StreamPartitioner is used as part of the IQ to determine which 
host contains a certain key, as determined by the case 2) above.
    
    This PR's main goal is to remove the deprecated producer's default 
partitioner, while with those things in mind such that:
    
    We want to make sure for not-null keys, the default murmur2 hash behavior 
of the streams' partitioner stays consistent with producer's new built-in 
partitioner.
    For null-keys (which is only possible for non-window default stream 
partition, and is never used for IQ), we would fix the issue that we may never 
rotate to a new partitioner by setting the partition as null hence relying on 
the newly introduced built-in partitioner.
    
    Reviewers: Artem Livshits 
<84364232+artemlivsh...@users.noreply.github.com>, Matthias J. Sax 
<matth...@confluent.io>
---
 .../kafka/clients/producer/KafkaProducer.java      |  3 ++-
 .../producer/internals/BuiltInPartitioner.java     |  7 +++++++
 .../producer/internals/DefaultPartitioner.java     |  4 +---
 .../internals/WindowedStreamPartitioner.java       | 10 +++++-----
 .../internals/DefaultStreamPartitioner.java        | 22 ++++++++++++----------
 .../processor/internals/StreamsMetadataState.java  |  4 ++--
 6 files changed, 29 insertions(+), 21 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index e85d9eb8a9..74d408d9a5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -26,6 +26,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.clients.producer.internals.BufferPool;
+import org.apache.kafka.clients.producer.internals.BuiltInPartitioner;
 import org.apache.kafka.clients.producer.internals.KafkaProducerMetrics;
 import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
 import org.apache.kafka.clients.producer.internals.ProducerMetadata;
@@ -1385,7 +1386,7 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
 
         if (serializedKey != null && !partitionerIgnoreKeys) {
             // hash the keyBytes to choose a partition
-            return Utils.toPositive(Utils.murmur2(serializedKey)) % 
cluster.partitionsForTopic(record.topic()).size();
+            return BuiltInPartitioner.partitionForKey(serializedKey, 
cluster.partitionsForTopic(record.topic()).size());
         } else {
             return RecordMetadata.UNKNOWN_PARTITION;
         }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java
index 1c2d10f3f6..a5805df56b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java
@@ -279,6 +279,13 @@ public class BuiltInPartitioner {
         }
     }
 
+    /*
+     * Default hashing function to choose a partition from the serialized key 
bytes
+     */
+    public static int partitionForKey(final byte[] serializedKey, final int 
numPartitions) {
+        return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
+    }
+
     /**
      * The partition load stats for each topic that are used for adaptive 
partition distribution.
      */
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
index 2c2e79fb20..716773626c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
@@ -18,7 +18,6 @@ package org.apache.kafka.clients.producer.internals;
 
 import org.apache.kafka.clients.producer.Partitioner;
 import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.utils.Utils;
 
 import java.util.Map;
 
@@ -71,8 +70,7 @@ public class DefaultPartitioner implements Partitioner {
         if (keyBytes == null) {
             return stickyPartitionCache.partition(topic, cluster);
         }
-        // hash the keyBytes to choose a partition
-        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
+        return BuiltInPartitioner.partitionForKey(keyBytes, numPartitions);
     }
 
     public void close() {}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
index 8e1476a7ed..d68a52b8d0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
@@ -16,12 +16,10 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.clients.producer.internals.BuiltInPartitioner;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 
-import static org.apache.kafka.common.utils.Utils.toPositive;
-
 public class WindowedStreamPartitioner<K, V> implements 
StreamPartitioner<Windowed<K>, V> {
 
     private final WindowedSerializer<K> serializer;
@@ -43,9 +41,11 @@ public class WindowedStreamPartitioner<K, V> implements 
StreamPartitioner<Window
      */
     @Override
     public Integer partition(final String topic, final Windowed<K> 
windowedKey, final V value, final int numPartitions) {
+        // for windowed key, the key bytes should never be null
         final byte[] keyBytes = serializer.serializeBaseKey(topic, 
windowedKey);
 
-        // hash the keyBytes to choose a partition
-        return toPositive(Utils.murmur2(keyBytes)) % numPartitions;
+        // stick with the same built-in partitioner util functions that 
producer used
+        // to make sure its behavior is consistent with the producer
+        return BuiltInPartitioner.partitionForKey(keyBytes, numPartitions);
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java
index f5c9c158bc..c7d909c65a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java
@@ -15,28 +15,30 @@
  * limitations under the License.
  */
 package org.apache.kafka.streams.processor.internals;
-import org.apache.kafka.common.Cluster;
+
+import org.apache.kafka.clients.producer.internals.BuiltInPartitioner;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 
 public class DefaultStreamPartitioner<K, V> implements StreamPartitioner<K, V> 
{
 
-    private final Cluster cluster;
     private final Serializer<K> keySerializer;
 
-    @SuppressWarnings("deprecation")
-    private final 
org.apache.kafka.clients.producer.internals.DefaultPartitioner 
defaultPartitioner;
-
-    @SuppressWarnings("deprecation")
-    public DefaultStreamPartitioner(final Serializer<K> keySerializer, final 
Cluster cluster) {
-        this.cluster = cluster;
+    public DefaultStreamPartitioner(final Serializer<K> keySerializer) {
         this.keySerializer = keySerializer;
-        this.defaultPartitioner = new 
org.apache.kafka.clients.producer.internals.DefaultPartitioner();
     }
 
     @Override
     public Integer partition(final String topic, final K key, final V value, 
final int numPartitions) {
         final byte[] keyBytes = keySerializer.serialize(topic, key);
-        return defaultPartitioner.partition(topic, key, keyBytes, value, null, 
cluster, numPartitions);
+
+        // if the key bytes are not available, we just return null to let the 
producer to decide
+        // which partition to send internally; otherwise stick with the same 
built-in partitioner
+        // util functions that producer used to make sure its behavior is 
consistent with the producer
+        if (keyBytes == null) {
+            return null;
+        } else {
+            return BuiltInPartitioner.partitionForKey(keyBytes, numPartitions);
+        }
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
index e8ee3eacf6..6850715b0a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
@@ -212,7 +212,7 @@ public class StreamsMetadataState {
         }
         return getKeyQueryMetadataForKey(storeName,
                                          key,
-                                         new 
DefaultStreamPartitioner<>(keySerializer, clusterMetadata));
+                                         new 
DefaultStreamPartitioner<>(keySerializer));
     }
 
     /**
@@ -225,7 +225,7 @@ public class StreamsMetadataState {
         Objects.requireNonNull(keySerializer, "keySerializer can't be null");
         return getKeyQueryMetadataForKey(storeName,
                                          key,
-                                         new 
DefaultStreamPartitioner<>(keySerializer, clusterMetadata),
+                                         new 
DefaultStreamPartitioner<>(keySerializer),
                                          topologyName);
     }
 

Reply via email to