[ 
https://issues.apache.org/jira/browse/FLINK-19383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17203126#comment-17203126
 ] 

Aljoscha Krettek commented on FLINK-19383:
------------------------------------------

I think this is too specific to be implemented in Flink proper. And I also 
don't see how we could implement this with bigger changes to how state is 
maintained. The most immediate problem I see is how to keep the correlation 
between "Kafka Reader Operator" and "Kafka Partition State Operator" in sync, 
the reader for a given Kafka partition might move because of rebalancing or 
upon job restart. If this happens, we would need to make sure that the Kafka 
Partition state also moves to the correct subtask.

In general, most Flink developers that think about state in Flink consider 
adding operator state (and especially union state) to have been a mistake 
because it complicates state handling. Especially when you want to have a 
system that can dynamically scale in and out. Keyed state is "simple" to work 
with because we can move individual key partitions around and also change the 
record partitioning to match that.

I do agree, that you have a valid use case, though, but I think there are other 
possible solutions. Semantically, what you want to express is similar to a 
1-minute window, but not quite. Currently, even if you did use windows, this 
would always incur a full shuffle but could be solved by Flink if we did add a 
Hadoop-style combine step for windowing that aggregates data locally on 
upstream tasks before sending them down the shuffle.

> Per Partition State
> -------------------
>
>                 Key: FLINK-19383
>                 URL: https://issues.apache.org/jira/browse/FLINK-19383
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / DataStream
>            Reporter: Theo Diefenthal
>            Priority: Major
>
> With Kafka possibly being the mostly used data source in Flink, I'd like to 
> propse a new "per-partition-state".
> Right now, Flink only knows about OperatorState (evenly distributed or union) 
> or keyedState.
> With Kafka having multiple partitions per topic, Flink already exploits that 
> nicely. Most widely used is the feature that one can produce data with 
> ascending timestamps per kafka partition. (e.g. server logs with one server 
> sending data to one partition). In Flink, this results in a huge optimization 
> namingly that in that case, one can use an 
> AscendingTimestampWatermarkAssigner and windows can be closed quickly. 
> Making use of the performance optimization leads me to thinking that we could 
> go a step further and introduce a per-kafka-partition state. In my current 
> scenario, I need to buffer the data per server (and thus per kafka partition) 
> for 1 minute in event time, waiting if during that time certain other events 
> arrive or not.
> A state per kafka partition is currently hard to implement. The best to do is 
> keyby the datastream by kafka-partition. However, the KafkaAssigner has 
> different assignment characteristics then the KeyGroupRangeAssignment leading 
> to an unnecessary shuffle step. Even worse, the KeyGroupRangeAssignment is 
> kind of random whereas the kafka-partition assignment from source works round 
> robing. Having similarly loaded kafka-partitions, after keying, the load can 
> be skewed on the taskmanagers. For a simple pipeline with parallelism 3 and 3 
> partitions, this can lead to e.g. one taskManager processing 2 partitions, 
> one taskmanager 1 partition and one taskManager being idle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to