[ 
https://issues.apache.org/jira/browse/FLINK-8181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274125#comment-16274125
 ] 

ASF GitHub Bot commented on FLINK-8181:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5108#discussion_r154293212
  
    --- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
 ---
    @@ -68,6 +78,13 @@ public int partition(T record, byte[] key, byte[] value, 
String targetTopic, int
                        partitions != null && partitions.length > 0,
                        "Partitions of the target topic is empty.");
     
    -           return partitions[parallelInstanceId % partitions.length];
    +           if (topicToFixedPartition.containsKey(targetTopic)) {
    --- End diff --
    
    The full solution would be to allow FlinkKafkaPartitioner to be stateful 
and participate in checkpointing.
    That might mean passing in the runtime context in 
FlinkKafkaPartitioner.open(), but that would be touching the user API.
    
    We could also opt for a workaround to only register state for the 
FlinkFixedPartitioner internally, and still restrict custom partitioners to be 
stateless.
    
    However, I think it would make more sense to avoid workarounds at this 
stage.



> Kafka producer's FlinkFixedPartitioner returns different partitions when a 
> target topic is rescaled
> ---------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-8181
>                 URL: https://issues.apache.org/jira/browse/FLINK-8181
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.4.0, 1.5.0
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>             Fix For: 1.4.0, 1.5.0
>
>
> On fixing FLINK-6288 and migrating the original {{FlinkFixedPartitioner}} to 
> the new partitioning API (commit 9ed9b68397b51bfd2b0f6e532212a82f771641bd), 
> the {{FlinkFixedPartitioner}} no longer returns identical target partitions 
> once a target topic is rescaled.
> This results in a behavioral regression when the {{FlinkFixedPartitioner}} is 
> used.
> The {{FlinkFixedPartitionerTest}} should also be strengthened to cover the 
> target topic rescaling case.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to