Theo Diefenthal created FLINK-19383:
---------------------------------------
Summary: Per Partition State
Key: FLINK-19383
URL: https://issues.apache.org/jira/browse/FLINK-19383
Project: Flink
Issue Type: Improvement
Reporter: Theo Diefenthal
With Kafka possibly being the mostly used data source in Flink, I'd like to
propse a new "per-partition-state".
Right now, Flink only knows about OperatorState (evenly distributed or union)
or keyedState.
With Kafka having multiple partitions per topic, Flink already exploits that
nicely. Most widely used is the feature that one can produce data with
ascending timestamps per kafka partition. (e.g. server logs with one server
sending data to one partition). In Flink, this results in a huge optimization
namingly that in that case, one can use an AscendingTimestampWatermarkAssigner
and windows can be closed quickly.
Making use of the performance optimization leads me to thinking that we could
go a step further and introduce a per-kafka-partition state. In my current
scenario, I need to buffer the data per server (and thus per kafka partition)
for 1 minute in event time, waiting if during that time certain other events
arrive or not.
A state per kafka partition is currently hard to implement. The best to do is
keyby the datastream by kafka-partition. However, the KafkaAssigner has
different assignment characteristics then the KeyGroupRangeAssignment leading
to an unnecessary shuffle step. Even worse, the KeyGroupRangeAssignment is kind
of random whereas the kafka-partition assignment from source works round
robing. Having similarly loaded kafka-partitions, after keying, the load can be
skewed on the taskmanagers. For a simple pipeline with parallelism 3 and 3
partitions, this can lead to e.g. one taskManager processing 2 partitions, one
taskmanager 1 partition and one taskManager being idle.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)