[ 
https://issues.apache.org/jira/browse/FLINK-19382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17201968#comment-17201968
 ] 

Yun Tang commented on FLINK-19382:
----------------------------------

Thanks for bringing this discussion. If you ever read 
[FLIP-142|https://cwiki.apache.org/confluence/display/FLINK/FLIP-142%3A+Disentangle+StateBackends+from+Checkpointing]
 and try to implement a state backend. I think you could know that a state 
backend is consisted by three parts:
1) operator state backend, 2) keyed state backend 3) checkpoint storage. What 
the main component to change of the ReplayableSourceStateBackend you want to 
introduce here?

If you want to introduce a new keyed state backend, what is {{currentKey}} for 
your source?
If I understand correctly, the content you want to store is mainly the kafka 
offset. And you take the role of state backend to not state access but mainly 
for fault tolerance.  If so, I think a customized operator with operator state 
backend might meet your requests.

You could give more details about the internal data structure of state backend 
so that we could have more clearer view.

> Introducing ReplayableSourceStateBackend
> ----------------------------------------
>
>                 Key: FLINK-19382
>                 URL: https://issues.apache.org/jira/browse/FLINK-19382
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: Theo Diefenthal
>            Priority: Major
>
> I got the idea of a new StateBackend simply called "ReplayableSource". This 
> statebackend would be bound by a number of limitations, but in the few areas, 
> it could improve the pipeline performance by magnitudes which makes me think 
> it's worth debating about it.
> I'd like to start with describing two useful scenarios for such a state 
> backend before debating more about the backend. Both scenarios share that 
> they read data from kafka and process that via Flink.
> Scenario 1: Buffering data. Currently I'm developing a pipeline where 
> directly post to reading the data, I need to buffer it for 1 minute in event 
> time. This is due to inter-event-dependencies: If within one minute a certain 
> event arrives, I have to enrich information to the first event. Right now, I 
> store the 1 minute event time data in Flink State leading to checkpointing 
> the entire buffer on each checkpoint and making it impossible for me to use 
> exactly-once processing (We want to have as low latency as possible, i.e. 
> after buffering at maximum a few seconds while simoulatenously having high 
> volume of data).
> Scenario 2: Performing Flink SQL CEP Queries. CEP Queries naturally have 
> inter-event-dependencies. Having simple MATCH_RECOGNIZE queries directly post 
> to a kafka source often lead to requiring RocksDB state backend and slow 
> performance.
>  
> The idea: Instead of storing the entire state, we could simply store a kafka 
> offset. When restoring the state from savepoint, all the state could be 
> restored by reading from kafka. The checkpoint size would thus be reduced 
> from huge sizes down to just a few numbers which allows frequent and fast 
> checkpointing.
>  
> Limitations:
>  * This would only work for fully deterministic/replayable streaming jobs. If 
> a certain operator within the pipeline is not determinstic, a replay could 
> cause another result.
>  * The source must be replayable, e.g. kafka
>  * This would also only work for "short-state-living" pipelines. There are 
> many pipelines which build up their state over days, month or even years. 
> Restoring such a state by replaying all the data over that time would be 
> almost impossible, especially as kafka usually has a retention of something 
> like a week configured. However, there are also many queries with 
> short-lived-state like the mentioned CEP usecase where one usually have 
> patterns defined in timeframes of second, minutes, hours or a few days for 
> the event correlation.
>  * Not sure if there are more limitations with regards to 
> windowing/watermarks or similar things which would make that feature 
> impossible!?
> For certain scenarios, this feature would obviously be dumb, most likely for 
> windows pipelines. It is certainly much cheapter to e.g. store a COUNT per 
> window then replaying all events per window in order to restore that COUNT. 
> But I'm focusing on something like CEP.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to