Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5108#discussion_r154363656
  
    --- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
 ---
    @@ -68,6 +78,13 @@ public int partition(T record, byte[] key, byte[] value, 
String targetTopic, int
                        partitions != null && partitions.length > 0,
                        "Partitions of the target topic is empty.");
     
    -           return partitions[parallelInstanceId % partitions.length];
    +           if (topicToFixedPartition.containsKey(targetTopic)) {
    --- End diff --
    
    It might be that the "fixed partitioner" is hard to implement or that its 
semantics are somewhat strange. Not sure though. What's the default Kafka 
behaviour? Maybe we should also have that as the default behaviour. 


---

Reply via email to