This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new cfdd567955 KAFKA-13880: Remove DefaultPartitioner from StreamPartitioner (#12304) cfdd567955 is described below commit cfdd567955588e134770a9145ba57800ca88313c Author: Guozhang Wang <wangg...@gmail.com> AuthorDate: Fri Jun 17 20:17:02 2022 -0700 KAFKA-13880: Remove DefaultPartitioner from StreamPartitioner (#12304) There are some considerata embedded in this seemingly straight-forward PR that I'd like to explain here. The StreamPartitioner is used to send records to three types of topics: 1) repartition topics, where key should never be null. 2) changelog topics, where key should never be null. 3) sink topics, where only non-windowed key could be null and windowed key should still never be null. Also, the StreamPartitioner is used as part of the IQ to determine which host contains a certain key, as determined by the case 2) above. This PR's main goal is to remove the deprecated producer's default partitioner, while with those things in mind such that: We want to make sure for not-null keys, the default murmur2 hash behavior of the streams' partitioner stays consistent with producer's new built-in partitioner. For null-keys (which is only possible for non-window default stream partition, and is never used for IQ), we would fix the issue that we may never rotate to a new partitioner by setting the partition as null hence relying on the newly introduced built-in partitioner. Reviewers: Artem Livshits <84364232+artemlivsh...@users.noreply.github.com>, Matthias J. Sax <matth...@confluent.io> --- .../kafka/clients/producer/KafkaProducer.java | 3 ++- .../producer/internals/BuiltInPartitioner.java | 7 +++++++ .../producer/internals/DefaultPartitioner.java | 4 +--- .../internals/WindowedStreamPartitioner.java | 10 +++++----- .../internals/DefaultStreamPartitioner.java | 22 ++++++++++++---------- .../processor/internals/StreamsMetadataState.java | 4 ++-- 6 files changed, 29 insertions(+), 21 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index e85d9eb8a9..74d408d9a5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -26,6 +26,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.clients.producer.internals.BufferPool; +import org.apache.kafka.clients.producer.internals.BuiltInPartitioner; import org.apache.kafka.clients.producer.internals.KafkaProducerMetrics; import org.apache.kafka.clients.producer.internals.ProducerInterceptors; import org.apache.kafka.clients.producer.internals.ProducerMetadata; @@ -1385,7 +1386,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { if (serializedKey != null && !partitionerIgnoreKeys) { // hash the keyBytes to choose a partition - return Utils.toPositive(Utils.murmur2(serializedKey)) % cluster.partitionsForTopic(record.topic()).size(); + return BuiltInPartitioner.partitionForKey(serializedKey, cluster.partitionsForTopic(record.topic()).size()); } else { return RecordMetadata.UNKNOWN_PARTITION; } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java index 1c2d10f3f6..a5805df56b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java @@ -279,6 +279,13 @@ public class BuiltInPartitioner { } } + /* + * Default hashing function to choose a partition from the serialized key bytes + */ + public static int partitionForKey(final byte[] serializedKey, final int numPartitions) { + return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions; + } + /** * The partition load stats for each topic that are used for adaptive partition distribution. */ diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java index 2c2e79fb20..716773626c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java @@ -18,7 +18,6 @@ package org.apache.kafka.clients.producer.internals; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.utils.Utils; import java.util.Map; @@ -71,8 +70,7 @@ public class DefaultPartitioner implements Partitioner { if (keyBytes == null) { return stickyPartitionCache.partition(topic, cluster); } - // hash the keyBytes to choose a partition - return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; + return BuiltInPartitioner.partitionForKey(keyBytes, numPartitions); } public void close() {} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java index 8e1476a7ed..d68a52b8d0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java @@ -16,12 +16,10 @@ */ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.clients.producer.internals.BuiltInPartitioner; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.StreamPartitioner; -import static org.apache.kafka.common.utils.Utils.toPositive; - public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Windowed<K>, V> { private final WindowedSerializer<K> serializer; @@ -43,9 +41,11 @@ public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Window */ @Override public Integer partition(final String topic, final Windowed<K> windowedKey, final V value, final int numPartitions) { + // for windowed key, the key bytes should never be null final byte[] keyBytes = serializer.serializeBaseKey(topic, windowedKey); - // hash the keyBytes to choose a partition - return toPositive(Utils.murmur2(keyBytes)) % numPartitions; + // stick with the same built-in partitioner util functions that producer used + // to make sure its behavior is consistent with the producer + return BuiltInPartitioner.partitionForKey(keyBytes, numPartitions); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java index f5c9c158bc..c7d909c65a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java @@ -15,28 +15,30 @@ * limitations under the License. */ package org.apache.kafka.streams.processor.internals; -import org.apache.kafka.common.Cluster; + +import org.apache.kafka.clients.producer.internals.BuiltInPartitioner; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.processor.StreamPartitioner; public class DefaultStreamPartitioner<K, V> implements StreamPartitioner<K, V> { - private final Cluster cluster; private final Serializer<K> keySerializer; - @SuppressWarnings("deprecation") - private final org.apache.kafka.clients.producer.internals.DefaultPartitioner defaultPartitioner; - - @SuppressWarnings("deprecation") - public DefaultStreamPartitioner(final Serializer<K> keySerializer, final Cluster cluster) { - this.cluster = cluster; + public DefaultStreamPartitioner(final Serializer<K> keySerializer) { this.keySerializer = keySerializer; - this.defaultPartitioner = new org.apache.kafka.clients.producer.internals.DefaultPartitioner(); } @Override public Integer partition(final String topic, final K key, final V value, final int numPartitions) { final byte[] keyBytes = keySerializer.serialize(topic, key); - return defaultPartitioner.partition(topic, key, keyBytes, value, null, cluster, numPartitions); + + // if the key bytes are not available, we just return null to let the producer to decide + // which partition to send internally; otherwise stick with the same built-in partitioner + // util functions that producer used to make sure its behavior is consistent with the producer + if (keyBytes == null) { + return null; + } else { + return BuiltInPartitioner.partitionForKey(keyBytes, numPartitions); + } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java index e8ee3eacf6..6850715b0a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java @@ -212,7 +212,7 @@ public class StreamsMetadataState { } return getKeyQueryMetadataForKey(storeName, key, - new DefaultStreamPartitioner<>(keySerializer, clusterMetadata)); + new DefaultStreamPartitioner<>(keySerializer)); } /** @@ -225,7 +225,7 @@ public class StreamsMetadataState { Objects.requireNonNull(keySerializer, "keySerializer can't be null"); return getKeyQueryMetadataForKey(storeName, key, - new DefaultStreamPartitioner<>(keySerializer, clusterMetadata), + new DefaultStreamPartitioner<>(keySerializer), topologyName); }