[ 
https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17489342#comment-17489342
 ] 

shizhengchao commented on FLINK-26033:
--------------------------------------

[~MartijnVisser] I made a mistake,I re-checked, the documentation only applies 
to older kafka versions, such as kafka-client-2.0.1, and the problem I'm 
talking about is kafka-client-2.4.1. 
{code:java}
//kafka-2.0.1
public int partition(String topic, Object key, byte[] keyBytes, Object value, 
byte[] valueBytes, Cluster cluster) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if (keyBytes == null) {
        int nextValue = nextValue(topic);
        List<PartitionInfo> availablePartitions = 
cluster.availablePartitionsForTopic(topic);
        if (availablePartitions.size() > 0) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
            // no partitions are available, give a non-available partition
            return Utils.toPositive(nextValue) % numPartitions;
        }
    } else {
        // hash the keyBytes to choose a partition
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
}

// kafka-2.4.1
public int partition(String topic, Object key, byte[] keyBytes, Object value, 
byte[] valueBytes, Cluster cluster) {
    if (keyBytes == null) {
        return stickyPartitionCache.partition(topic, cluster);
    } 
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    // hash the keyBytes to choose a partition
    return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}

public int nextPartition(String topic, Cluster cluster, int prevPartition) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    Integer oldPart = indexCache.get(topic);
    Integer newPart = oldPart;
    // Check that the current sticky partition for the topic is either not set 
or that the partition that 
    // triggered the new batch matches the sticky partition that needs to be 
changed.
    if (oldPart == null || oldPart == prevPartition) {
        List<PartitionInfo> availablePartitions = 
cluster.availablePartitionsForTopic(topic);
        if (availablePartitions.size() < 1) {
            Integer random = 
Utils.toPositive(ThreadLocalRandom.current().nextInt());
            newPart = random % partitions.size();
        } else if (availablePartitions.size() == 1) {
            newPart = availablePartitions.get(0).partition();
        } else {
            while (newPart == null || newPart.equals(oldPart)) {
                Integer random = 
Utils.toPositive(ThreadLocalRandom.current().nextInt());
                newPart = availablePartitions.get(random % 
availablePartitions.size()).partition();
            }
        }
        // Only change the sticky partition if it is null or prevPartition 
matches the current sticky partition.
        if (oldPart == null) {
            indexCache.putIfAbsent(topic, newPart);
        } else {
            indexCache.replace(topic, prevPartition, newPart);
        }
        return indexCache.get(topic);
    }
    return indexCache.get(topic);
}

{code}

> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect
> --------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-26033
>                 URL: https://issues.apache.org/jira/browse/FLINK-26033
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.13.3, 1.14.3
>            Reporter: shizhengchao
>            Priority: Major
>
> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect. Flink treats 'default' and 'round-robin' as the same 
> strategy.
> {code:java}
> //代码占位符
> public static Optional<FlinkKafkaPartitioner<RowData>> 
> getFlinkKafkaPartitioner(
>         ReadableConfig tableOptions, ClassLoader classLoader) {
>     return tableOptions
>             .getOptional(SINK_PARTITIONER)
>             .flatMap(
>                     (String partitioner) -> {
>                         switch (partitioner) {
>                             case SINK_PARTITIONER_VALUE_FIXED:
>                                 return Optional.of(new 
> FlinkFixedPartitioner<>());
>                             case SINK_PARTITIONER_VALUE_DEFAULT:
>                             case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
>                                 return Optional.empty();
>                                 // Default fallback to full class name of the 
> partitioner.
>                             default:
>                                 return Optional.of(
>                                         initializePartitioner(partitioner, 
> classLoader));
>                         }
>                     });
> } {code}
> They both use kafka's default partitioner, but the actual There are two 
> scenarios for the partition on DefaultPartitioner:
> 1. Random when there is no key
> 2. When there is a key, take the modulo according to the key
> {code:java}
> // org.apache.kafka.clients.producer.internals.DefaultPartitioner
> public int partition(String topic, Object key, byte[] keyBytes, Object value, 
> byte[] valueBytes, Cluster cluster) {
>     if (keyBytes == null) {
>         // Random when there is no key        
>         return stickyPartitionCache.partition(topic, cluster);
>     } 
>     List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
>     int numPartitions = partitions.size();
>     // hash the keyBytes to choose a partition
>     return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
> } {code}
> Therefore, KafkaConnector does not have a round-robin strategy.But we can 
> borrow from kafka's RoundRobinPartitioner
> {code:java}
> //代码占位符
> public class RoundRobinPartitioner implements Partitioner {
>     private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new 
> ConcurrentHashMap<>();
>     public void configure(Map<String, ?> configs) {}
>     /**
>      * Compute the partition for the given record.
>      *
>      * @param topic The topic name
>      * @param key The key to partition on (or null if no key)
>      * @param keyBytes serialized key to partition on (or null if no key)
>      * @param value The value to partition on or null
>      * @param valueBytes serialized value to partition on or null
>      * @param cluster The current cluster metadata
>      */
>     @Override
>     public int partition(String topic, Object key, byte[] keyBytes, Object 
> value, byte[] valueBytes, Cluster cluster) {
>         List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
>         int numPartitions = partitions.size();
>         int nextValue = nextValue(topic);
>         List<PartitionInfo> availablePartitions = 
> cluster.availablePartitionsForTopic(topic);
>         if (!availablePartitions.isEmpty()) {
>             int part = Utils.toPositive(nextValue) % 
> availablePartitions.size();
>             return availablePartitions.get(part).partition();
>         } else {
>             // no partitions are available, give a non-available partition
>             return Utils.toPositive(nextValue) % numPartitions;
>         }
>     }
>     private int nextValue(String topic) {
>         AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
>             return new AtomicInteger(0);
>         });
>         return counter.getAndIncrement();
>     }
>     public void close() {}
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to