[ 
https://issues.apache.org/jira/browse/FLINK-16392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Qin updated FLINK-16392:
-----------------------------
    Description: 
IntervalJoin is getting lots of usecases in our side. Those use cases shares 
following similar pattern
 * left stream  pulled from slow evolving static dataset periodically
 * lookup time range is very large (days weeks)
 * right stream is web traffic with high QPS

In current interval join implementation, we treat both streams equal and 
ondemand pull / scan keyed state from backend (rocksdb here). When rocksdb 
fetch and update gets more expensive, performance took hit to unblock large use 
cases.

In proposed implementation, we plan to introduce two changes
 * allow user opt-in use cases like above by inherit from ProcessJoinFunction.
 ** whether skip trigger scan from left events(static data set)
 ** allow set earlier clean up right stream earlier than interval upper-bound
 * leverage ram cache on demand build sortedMap from it's otherBuffer for each 
join key, in our use cases, it helps
 ** expedite right stream lookup of left buffers without access rocksdb 
everytime (disk -> sorted memory cache)
 ** if a key see event from left side, it cleanup cache and load cache from 
right side
 ** in worst case scenario, we only see two stream with round robin 
processElement1 and processElement2 of same set of keys at same frequency. 
Performance is expected to be similar with current implementation, memory 
footprint will be bounded by 1/2 state size.

 

Open discussion
 * how to control cache size?
 ** by default cache size is set to 1 key
 * how to avoid dirty cache
 ** if a given key see insertion from other side, cache will be cleared for 
that key and rebuild.
 * what happens when checkpoint/restore
 ** state still persists in statebackend, clear cache and rebuild of each new 
key seen.
 * how is performance
 ** Given assumption ram is magnitude faster than ssd and lot more to spin 
disk, this is a small overhead (1% - 5%) to populate cache, compare with 
current rocksdb implemenation, we need do full loop at every event. It saves on 
bucket scan logic. If key recurring more than 1 access in same direction on 
cache, we expect significant perf gain.

 

  was:
IntervalJoin is getting lots of usecases in our side. Those use cases shares 
following similar pattern
 * left stream  pulled from slow evolving static dataset periodically
 * lookup time range is very large (days weeks)
 * right stream is web traffic with high QPS

In current interval join implementation, we treat both streams equal and 
ondemand pull / scan keyed state from backend (rocksdb here). When rocksdb 
fetch and update gets more expensive, performance took hit to unblock large use 
cases.

In proposed implementation, we plan to introduce two changes
 * allow user opt-in use cases like above by inherit from ProcessJoinFunction.
 ** whether skip trigger scan from left events(static data set)
 ** allow set earlier clean up right stream earlier than interval upper-bound
 * leverage ram cache on demand build sortedMap from it's otherBuffer for each 
join key, in our use cases, it helps
 ** expedite right stream lookup of left buffers without access rocksdb 
everytime (disk -> sorted memory cache)
 ** if a key see event from left side, it cleanup cache and load cache from 
right side
 ** in worst case scenario, we only see two stream with round robin 
processElement1 and processElement2 of same set of keys at same frequency. 
Performance is expected to be similar with current implementation, memory 
footprint will be bounded by 1/2 state size.

 

Open discussion
 * how to control cache size?
 ** by default cache size is set to 1 key
 * how to avoid dirty cache
 ** if a given key see insertion from other side, cache will be cleared for 
that key and rebuild.
 * what happens when checkpoint/restore
 ** state still persists in statebackend, clear cache and rebuild of each new 
key seen.
 * how is performance
 ** Given assumption ram is magnitude faster than ram, this is a small overhead 
(<5%) to populate cache, compare with current rocksdb implemenation, we need do 
full loop at every event. It saves on bucket scan logic. If key recurring more 
than 1 access in same direction on cache, we expect significant perf gain.

 


> oneside sorted cache in intervaljoin
> ------------------------------------
>
>                 Key: FLINK-16392
>                 URL: https://issues.apache.org/jira/browse/FLINK-16392
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / DataStream
>    Affects Versions: 1.10.0
>            Reporter: Chen Qin
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.11.0
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> IntervalJoin is getting lots of usecases in our side. Those use cases shares 
> following similar pattern
>  * left stream  pulled from slow evolving static dataset periodically
>  * lookup time range is very large (days weeks)
>  * right stream is web traffic with high QPS
> In current interval join implementation, we treat both streams equal and 
> ondemand pull / scan keyed state from backend (rocksdb here). When rocksdb 
> fetch and update gets more expensive, performance took hit to unblock large 
> use cases.
> In proposed implementation, we plan to introduce two changes
>  * allow user opt-in use cases like above by inherit from ProcessJoinFunction.
>  ** whether skip trigger scan from left events(static data set)
>  ** allow set earlier clean up right stream earlier than interval upper-bound
>  * leverage ram cache on demand build sortedMap from it's otherBuffer for 
> each join key, in our use cases, it helps
>  ** expedite right stream lookup of left buffers without access rocksdb 
> everytime (disk -> sorted memory cache)
>  ** if a key see event from left side, it cleanup cache and load cache from 
> right side
>  ** in worst case scenario, we only see two stream with round robin 
> processElement1 and processElement2 of same set of keys at same frequency. 
> Performance is expected to be similar with current implementation, memory 
> footprint will be bounded by 1/2 state size.
>  
> Open discussion
>  * how to control cache size?
>  ** by default cache size is set to 1 key
>  * how to avoid dirty cache
>  ** if a given key see insertion from other side, cache will be cleared for 
> that key and rebuild.
>  * what happens when checkpoint/restore
>  ** state still persists in statebackend, clear cache and rebuild of each new 
> key seen.
>  * how is performance
>  ** Given assumption ram is magnitude faster than ssd and lot more to spin 
> disk, this is a small overhead (1% - 5%) to populate cache, compare with 
> current rocksdb implemenation, we need do full loop at every event. It saves 
> on bucket scan logic. If key recurring more than 1 access in same direction 
> on cache, we expect significant perf gain.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to