[
https://issues.apache.org/jira/browse/FLINK-9610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584046#comment-16584046
]
ASF GitHub Bot commented on FLINK-9610:
---------------------------------------
nielsbasjes commented on issue #6181: [FLINK-9610] [flink-connector-kafka-base]
Add Kafka Partitioner that uses the hash of the provided key.
URL: https://github.com/apache/flink/pull/6181#issuecomment-413893719
@tzulitai
I did some more digging and it is in fact there is a good reason for this
patch.
Have a look at
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
There you'll find constructors with essentially the following variations in
parameters:
- Either a SerializationSchema or a KeyedSerializationSchema
- Either a FlinkKafkaPartitioner or no partitioner which means it actually
uses FlinkFixedPartitioner as the default.
Looking at all of these constructors
1. If you do not specify a partitioner then all constructors use the
FlinkFixedPartitioner.
2. If you do specify a partitioner then it will use that partitioner.
Even the constructor that uses a KeyedSerializationSchema will NOT use that
key for the partitioning (which we saw in production which caused problems).
Essentially the current FlinkKafkaProducer API makes it very hard to 'not'
specify a partitioner and use the hash(key) partitioning which is nativly
present in Kafka.
So give the current API we came to the conclusion an extra partitioner is
needed.
Because the Flink API works this way I never looked deeper into base code to
see how it really moves below the surface.
Given what I understand now I see two viable ways forward:
1. We change the behavior of the API so that if a KeyedSerializationSchema
is used that in that case the hash of the key will be used by Kafka to
partition the data over. This is however an impact full change in the way the
API behaves. I.e. breaking the API.
2. We simply add the partitioner I created.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Add Kafka partitioner that uses the key to partition by
> -------------------------------------------------------
>
> Key: FLINK-9610
> URL: https://issues.apache.org/jira/browse/FLINK-9610
> Project: Flink
> Issue Type: New Feature
> Components: Kafka Connector
> Reporter: Niels Basjes
> Assignee: Niels Basjes
> Priority: Major
> Labels: pull-request-available
>
> The kafka connector package only contains the FlinkFixedPartitioner
> implementation of the FlinkKafkaPartitioner.
> The most common usecase I have seen is the need to spread the records across
> the Kafka partitions while keeping all messages with the same key together.
> I'll put up a pull request with a very simple implementation that should make
> this a lot easier for others to use and extend.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)