guozhangwang commented on code in PR #12304:
URL: https://github.com/apache/kafka/pull/12304#discussion_r899425425
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java:
##########
@@ -279,6 +279,13 @@ public int partition() {
}
}
+ /*
+ * Default hashing function to choose a partition from the serialized key
bytes
+ */
+ public static int partitionForKey(final byte[] serializedKey, final int
numPartitions) {
Review Comment:
The intention of introducing this static function is to make sure everyone
(including streams)'s default partitioner behavior is the same; and in the
future if we ever change the algorithm we would make sure we change everyone as
well.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java:
##########
@@ -15,28 +15,30 @@
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals;
-import org.apache.kafka.common.Cluster;
+
+import org.apache.kafka.clients.producer.internals.BuiltInPartitioner;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.processor.StreamPartitioner;
public class DefaultStreamPartitioner<K, V> implements StreamPartitioner<K, V>
{
- private final Cluster cluster;
private final Serializer<K> keySerializer;
- @SuppressWarnings("deprecation")
- private final
org.apache.kafka.clients.producer.internals.DefaultPartitioner
defaultPartitioner;
-
- @SuppressWarnings("deprecation")
- public DefaultStreamPartitioner(final Serializer<K> keySerializer, final
Cluster cluster) {
- this.cluster = cluster;
+ public DefaultStreamPartitioner(final Serializer<K> keySerializer) {
this.keySerializer = keySerializer;
- this.defaultPartitioner = new
org.apache.kafka.clients.producer.internals.DefaultPartitioner();
}
@Override
public Integer partition(final String topic, final K key, final V value,
final int numPartitions) {
final byte[] keyBytes = keySerializer.serialize(topic, key);
- return defaultPartitioner.partition(topic, key, keyBytes, value, null,
cluster, numPartitions);
+
+ // if the key bytes are not available, we just return null to let the
producer to decide
+ // which partition to send internally; otherwise stick with the same
built-in partitioner
+ // util functions that producer used to make sure its behavior is
consistent with the producer
+ if (keyBytes == null) {
Review Comment:
Here is the intended fix for relying on the new built-in partitioner, with
the assumption in mind that if a key is null it would never be used for IQ
purposes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]