[ 
https://issues.apache.org/jira/browse/FLINK-16392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Qin updated FLINK-16392:
-----------------------------
    Description: 
IntervalJoin is getting lots of usecases in our side. Those use cases shares 
following similar pattern
 * left stream  pulled from slow evolving static dataset periodically
 * lookup time range is very large (days weeks)
 * right stream is web traffic with high QPS

In current interval join implementation, we treat both streams equal and 
ondemand pull / scan keyed state from backend (rocksdb here). When rocksdb 
fetch and update gets more expensive, performance took hit to unblock large use 
cases.

In proposed implementation, we plan to introduce two changes
 * allow user opt-in by inherit from ProcessJoinFunction.
 ** if they want to skip trigger scan from left events(static data set)
 * on demand build sortedMap from it's otherBuffer for each join key, in our 
use cases, it helps
 ** expedite right stream lookup of left buffers without access rocksdb 
everytime (disk -> sorted memory cache)
 ** if a key see event from left side, it cleanup cache and load cache from 
right side
 ** in worst case scenario, we only see two stream with round robin 
processElement1 and processElement2 of same set of keys at same frequency. 
Performance is expected to be similar with current implementation, memory 
footprint will be bounded by 1/2 state size.

 

Open discussion
 * how to control cache size?
 ** TBD
 * how to avoid dirty cache
 ** if a given key see insertion from other side, cache will be cleared for 
that key and rebuild. This is a small overhead to populate cache, compare with 
current rocksdb implemenation, we need do full loop at every event. It saves on 
bucket scan logic.
 * what happens when checkpoint/restore
 ** state still persists in statebackend, clear cache and rebuild of each new 
key seen.

 

  was:
IntervalJoin is getting lots of usecases. Those use cases shares following 
similar pattern
 * left stream  pulled from static dataset periodically
 * lookup time range is very large (days weeks)
 * right stream is web traffic with high QPS

In current interval join implementation, we treat both streams equal. 
Specifically as rocksdb fetch and update getting more expensive, performance 
took hit and unblock large use cases.

In proposed implementation, we plan to introduce two changes
 * allow user opt-in in ProcessJoinFunction if they want to skip scan when 
intervaljoin operator receive events from left stream(static data set)
 * build sortedMap from otherBuffer of each seen key granularity
 ** expedite right stream lookup of left buffers without access rocksdb 
everytime
 ** if a key see event from left side, it cleanup buffer and load buffer from 
right side

 

Open discussion
 * how to control cache size?
 ** TBD
 * how to avoid dirty cache
 ** if a given key see insertion from other side, cache will be cleared for 
that key and rebuild. This is a small overhead to populate cache, compare with 
current rocksdb implemenation, we need do full loop at every event. It saves on 
bucket scan logic.
 * what happens when checkpoint/restore
 ** state still persists in statebackend, clear cache and rebuild of each new 
key seen.

 


> oneside sorted cache in intervaljoin
> ------------------------------------
>
>                 Key: FLINK-16392
>                 URL: https://issues.apache.org/jira/browse/FLINK-16392
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / DataStream
>    Affects Versions: 1.10.0
>            Reporter: Chen Qin
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.11.0
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> IntervalJoin is getting lots of usecases in our side. Those use cases shares 
> following similar pattern
>  * left stream  pulled from slow evolving static dataset periodically
>  * lookup time range is very large (days weeks)
>  * right stream is web traffic with high QPS
> In current interval join implementation, we treat both streams equal and 
> ondemand pull / scan keyed state from backend (rocksdb here). When rocksdb 
> fetch and update gets more expensive, performance took hit to unblock large 
> use cases.
> In proposed implementation, we plan to introduce two changes
>  * allow user opt-in by inherit from ProcessJoinFunction.
>  ** if they want to skip trigger scan from left events(static data set)
>  * on demand build sortedMap from it's otherBuffer for each join key, in our 
> use cases, it helps
>  ** expedite right stream lookup of left buffers without access rocksdb 
> everytime (disk -> sorted memory cache)
>  ** if a key see event from left side, it cleanup cache and load cache from 
> right side
>  ** in worst case scenario, we only see two stream with round robin 
> processElement1 and processElement2 of same set of keys at same frequency. 
> Performance is expected to be similar with current implementation, memory 
> footprint will be bounded by 1/2 state size.
>  
> Open discussion
>  * how to control cache size?
>  ** TBD
>  * how to avoid dirty cache
>  ** if a given key see insertion from other side, cache will be cleared for 
> that key and rebuild. This is a small overhead to populate cache, compare 
> with current rocksdb implemenation, we need do full loop at every event. It 
> saves on bucket scan logic.
>  * what happens when checkpoint/restore
>  ** state still persists in statebackend, clear cache and rebuild of each new 
> key seen.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to