[ 
https://issues.apache.org/jira/browse/FLINK-9610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584046#comment-16584046
 ] 

ASF GitHub Bot commented on FLINK-9610:
---------------------------------------

nielsbasjes commented on issue #6181: [FLINK-9610] [flink-connector-kafka-base] 
Add Kafka Partitioner that uses the hash of the provided key.
URL: https://github.com/apache/flink/pull/6181#issuecomment-413893719
 
 
   @tzulitai
   I did some more digging and it is in fact there is a good reason for this 
patch.
   
   Have a look at 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
   
   There you'll find constructors with essentially the following variations in 
parameters:
   
   - Either a SerializationSchema or a KeyedSerializationSchema
   - Either a FlinkKafkaPartitioner or no partitioner which means it actually 
uses FlinkFixedPartitioner as the default.
   
   Looking at all of these constructors
   
   1. If you do not specify a partitioner then all constructors use the 
FlinkFixedPartitioner.
   2. If you do specify a partitioner then it will use that partitioner.
   
   Even the constructor that uses a KeyedSerializationSchema will NOT use that 
key for the partitioning (which we saw in production which caused problems).
   Essentially the current FlinkKafkaProducer API makes it very hard to 'not' 
specify a partitioner and use the hash(key) partitioning which is nativly 
present in Kafka.
   
   So give the current API we came to the conclusion an extra partitioner is 
needed.
   
   Because the Flink API works this way I never looked deeper into base code to 
see how it really moves below the surface.
   
   Given what I understand now I see two viable ways forward:
   
   1. We change the behavior of the API so that if a KeyedSerializationSchema 
is used that in that case the hash of the key will be used by Kafka to 
partition the data over. This is however an impact full change in the way the 
API behaves. I.e. breaking the API.
   2. We simply add the partitioner I created.
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Kafka partitioner that uses the key to partition by
> -------------------------------------------------------
>
>                 Key: FLINK-9610
>                 URL: https://issues.apache.org/jira/browse/FLINK-9610
>             Project: Flink
>          Issue Type: New Feature
>          Components: Kafka Connector
>            Reporter: Niels Basjes
>            Assignee: Niels Basjes
>            Priority: Major
>              Labels: pull-request-available
>
> The kafka connector package only contains the FlinkFixedPartitioner 
> implementation of the FlinkKafkaPartitioner.
> The most common usecase I have seen is the need to spread the records across 
> the Kafka partitions while keeping all messages with the same key together.
> I'll put up a pull request with a very simple implementation that should make 
> this a lot easier for others to use and extend.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to