[ 
https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17500489#comment-17500489
 ] 

Qingsheng Ren commented on FLINK-26033:
---------------------------------------

Thanks [~tinny] for the explanation. I think this is related to KAFKA-9965 and 
it has not been resolved until now. It looks good to me to implement our own 
partitioner, which could also respect the backward compatibility since Kafka 
integrated round robin partitioner is only available after 2.4.0.

> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect
> --------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-26033
>                 URL: https://issues.apache.org/jira/browse/FLINK-26033
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.13.3, 1.11.6, 1.12.7, 1.14.3
>            Reporter: shizhengchao
>            Assignee: shizhengchao
>            Priority: Major
>              Labels: pull-request-available
>
> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect. Flink treats 'default' and 'round-robin' as the same 
> strategy.
> {code:java}
> //代码占位符
> public static Optional<FlinkKafkaPartitioner<RowData>> 
> getFlinkKafkaPartitioner(
>         ReadableConfig tableOptions, ClassLoader classLoader) {
>     return tableOptions
>             .getOptional(SINK_PARTITIONER)
>             .flatMap(
>                     (String partitioner) -> {
>                         switch (partitioner) {
>                             case SINK_PARTITIONER_VALUE_FIXED:
>                                 return Optional.of(new 
> FlinkFixedPartitioner<>());
>                             case SINK_PARTITIONER_VALUE_DEFAULT:
>                             case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
>                                 return Optional.empty();
>                                 // Default fallback to full class name of the 
> partitioner.
>                             default:
>                                 return Optional.of(
>                                         initializePartitioner(partitioner, 
> classLoader));
>                         }
>                     });
> } {code}
> They both use kafka's default partitioner, but the actual There are two 
> scenarios for the partition on DefaultPartitioner:
> 1. Random when there is no key
> 2. When there is a key, take the modulo according to the key
> {code:java}
> // org.apache.kafka.clients.producer.internals.DefaultPartitioner
> public int partition(String topic, Object key, byte[] keyBytes, Object value, 
> byte[] valueBytes, Cluster cluster) {
>     if (keyBytes == null) {
>         // Random when there is no key        
>         return stickyPartitionCache.partition(topic, cluster);
>     } 
>     List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
>     int numPartitions = partitions.size();
>     // hash the keyBytes to choose a partition
>     return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
> } {code}
> Therefore, KafkaConnector does not have a round-robin strategy.But we can 
> borrow from kafka's RoundRobinPartitioner
> {code:java}
> //代码占位符
> public class RoundRobinPartitioner implements Partitioner {
>     private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new 
> ConcurrentHashMap<>();
>     public void configure(Map<String, ?> configs) {}
>     /**
>      * Compute the partition for the given record.
>      *
>      * @param topic The topic name
>      * @param key The key to partition on (or null if no key)
>      * @param keyBytes serialized key to partition on (or null if no key)
>      * @param value The value to partition on or null
>      * @param valueBytes serialized value to partition on or null
>      * @param cluster The current cluster metadata
>      */
>     @Override
>     public int partition(String topic, Object key, byte[] keyBytes, Object 
> value, byte[] valueBytes, Cluster cluster) {
>         List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
>         int numPartitions = partitions.size();
>         int nextValue = nextValue(topic);
>         List<PartitionInfo> availablePartitions = 
> cluster.availablePartitionsForTopic(topic);
>         if (!availablePartitions.isEmpty()) {
>             int part = Utils.toPositive(nextValue) % 
> availablePartitions.size();
>             return availablePartitions.get(part).partition();
>         } else {
>             // no partitions are available, give a non-available partition
>             return Utils.toPositive(nextValue) % numPartitions;
>         }
>     }
>     private int nextValue(String topic) {
>         AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
>             return new AtomicInteger(0);
>         });
>         return counter.getAndIncrement();
>     }
>     public void close() {}
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to