[ 
https://issues.apache.org/jira/browse/FLINK-16392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Qin updated FLINK-16392:
-----------------------------
    Description: 
Native intervaljoin rely on statebackend(e.g rocksdb) to insert/fetch left and 
right buffer. This design choice reduce minimize heap memory footprint while 
bounded process throughput of single taskmanager iops to rocksdb access speed. 
Here at Pinterest, we have some large use cases where developers join large and 
slow evolving data stream (e.g post updates in last 28 days) with web traffic 
datastream (e.g post views up to 28 days after given update).

This post some challenge to current implementation of intervaljoin
 * partitioned rocksdb needs to keep both updates and views for 28 days, large 
buffer(especially view stream side) cause rocksdb slow down and lead to overall 
interval join performance degregate quickly as state build up.

 * view stream is web scale, even after setting large parallelism it can put 
lot of pressure on each subtask and backpressure entire job

In proposed implementation, we plan to introduce two changes
 * support ProcessJoinFunction settings to opt-in earlier cleanup time of right 
stream(e.g view stream don't have to stay in buffer for 28 days and wait for 
update stream to join, related post views happens after update in event time 
semantic) This optimization can reduce state size to improve rocksdb 
throughput. If extreme case, user can opt-in in flight join and skip write into 
right view stream buffer to save iops budget on each subtask

 * support ProcessJoinFunction settings to expedite keyed lookup of slow 
changing stream. Instead of every post view pull post updates from rocksdb. 
user can opt-in and having one side buffer cache available in memory. If a 
given post update, cache load recent views from right buffer and use sortedMap 
to find buckets. If a given post view, cache load recent updates from left 
buffer to memory. When another view for that post arrives, flink save cost of 
rocksdb access.

  was:
IntervalJoin is getting lots of usecases in our side. Those use cases shares 
following similar pattern
 * left stream pulled from low QPS source
 * from right stream to left stream lookup time range is very large (days weeks)
 * right stream is web traffic with high QPS

In current interval join implementation, we treat both streams equal and 
ondemand pull / scan keyed state from backend (rocksdb here). When rocksdb 
fetch and update gets more expensive, performance took hit to unblock large use 
cases.

In proposed implementation, we plan to introduce two changes
 * allow user opt-in more aggresive right buffer cleanup

 ** allow overwrite earlier clean up right stream earlier than interval 
upper-bound
 * leverage ram cache on demand build sortedMap from it's otherBuffer for each 
join key, in our use cases, it helps
 ** expedite right stream lookup of left buffers without access rocksdb 
everytime (disk -> sorted memory cache)
 ** if a key see event from left side, it cleanup cache and load cache from 
right side
 ** in worst case scenario, we only see two stream with round robin 
processElement1 and processElement2 of same set of keys at same frequency. 
Performance is expected to be similar with current implementation, memory 
footprint will be bounded by 1/2 state size.

 

Open discussion
 * how to control cache size?
 ** by default cache size is set to 1 key
 * how to avoid dirty cache
 ** if a given key see insertion from other side, cache will be cleared for 
that key and rebuild.
 * what happens when checkpoint/restore
 ** state still persists in statebackend, clear cache and rebuild of each new 
key seen.
 * how is performance
 ** Given assumption ram is magnitude faster than ram, this is a small overhead 
(<5%) to populate cache, compare with current rocksdb implemenation, we need do 
full loop at every event. It saves on bucket scan logic. If key recurring more 
than 1 access in same direction on cache, we expect significant perf gain.

 


> oneside sorted cache in intervaljoin
> ------------------------------------
>
>                 Key: FLINK-16392
>                 URL: https://issues.apache.org/jira/browse/FLINK-16392
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / DataStream
>    Affects Versions: 1.10.0
>            Reporter: Chen Qin
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.11.0
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> Native intervaljoin rely on statebackend(e.g rocksdb) to insert/fetch left 
> and right buffer. This design choice reduce minimize heap memory footprint 
> while bounded process throughput of single taskmanager iops to rocksdb access 
> speed. Here at Pinterest, we have some large use cases where developers join 
> large and slow evolving data stream (e.g post updates in last 28 days) with 
> web traffic datastream (e.g post views up to 28 days after given update).
> This post some challenge to current implementation of intervaljoin
>  * partitioned rocksdb needs to keep both updates and views for 28 days, 
> large buffer(especially view stream side) cause rocksdb slow down and lead to 
> overall interval join performance degregate quickly as state build up.
>  * view stream is web scale, even after setting large parallelism it can put 
> lot of pressure on each subtask and backpressure entire job
> In proposed implementation, we plan to introduce two changes
>  * support ProcessJoinFunction settings to opt-in earlier cleanup time of 
> right stream(e.g view stream don't have to stay in buffer for 28 days and 
> wait for update stream to join, related post views happens after update in 
> event time semantic) This optimization can reduce state size to improve 
> rocksdb throughput. If extreme case, user can opt-in in flight join and skip 
> write into right view stream buffer to save iops budget on each subtask
>  * support ProcessJoinFunction settings to expedite keyed lookup of slow 
> changing stream. Instead of every post view pull post updates from rocksdb. 
> user can opt-in and having one side buffer cache available in memory. If a 
> given post update, cache load recent views from right buffer and use 
> sortedMap to find buckets. If a given post view, cache load recent updates 
> from left buffer to memory. When another view for that post arrives, flink 
> save cost of rocksdb access.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to