[ https://issues.apache.org/jira/browse/FLINK-16392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chen Qin updated FLINK-16392: ----------------------------- Description: IntervalJoin is getting lots of usecases in our side. Those use cases shares following similar pattern * left stream pulled from slow evolving static dataset periodically * lookup time range is very large (days weeks) * right stream is web traffic with high QPS In current interval join implementation, we treat both streams equal and ondemand pull / scan keyed state from backend (rocksdb here). When rocksdb fetch and update gets more expensive, performance took hit to unblock large use cases. In proposed implementation, we plan to introduce two changes * allow user opt-in by inherit from ProcessJoinFunction. ** if they want to skip trigger scan from left events(static data set) * on demand build sortedMap from it's otherBuffer for each join key, in our use cases, it helps ** expedite right stream lookup of left buffers without access rocksdb everytime (disk -> sorted memory cache) ** if a key see event from left side, it cleanup cache and load cache from right side ** in worst case scenario, we only see two stream with round robin processElement1 and processElement2 of same set of keys at same frequency. Performance is expected to be similar with current implementation, memory footprint will be bounded by 1/2 state size. Open discussion * how to control cache size? ** TBD * how to avoid dirty cache ** if a given key see insertion from other side, cache will be cleared for that key and rebuild. This is a small overhead to populate cache, compare with current rocksdb implemenation, we need do full loop at every event. It saves on bucket scan logic. * what happens when checkpoint/restore ** state still persists in statebackend, clear cache and rebuild of each new key seen. was: IntervalJoin is getting lots of usecases. Those use cases shares following similar pattern * left stream pulled from static dataset periodically * lookup time range is very large (days weeks) * right stream is web traffic with high QPS In current interval join implementation, we treat both streams equal. Specifically as rocksdb fetch and update getting more expensive, performance took hit and unblock large use cases. In proposed implementation, we plan to introduce two changes * allow user opt-in in ProcessJoinFunction if they want to skip scan when intervaljoin operator receive events from left stream(static data set) * build sortedMap from otherBuffer of each seen key granularity ** expedite right stream lookup of left buffers without access rocksdb everytime ** if a key see event from left side, it cleanup buffer and load buffer from right side Open discussion * how to control cache size? ** TBD * how to avoid dirty cache ** if a given key see insertion from other side, cache will be cleared for that key and rebuild. This is a small overhead to populate cache, compare with current rocksdb implemenation, we need do full loop at every event. It saves on bucket scan logic. * what happens when checkpoint/restore ** state still persists in statebackend, clear cache and rebuild of each new key seen. > oneside sorted cache in intervaljoin > ------------------------------------ > > Key: FLINK-16392 > URL: https://issues.apache.org/jira/browse/FLINK-16392 > Project: Flink > Issue Type: Improvement > Components: API / DataStream > Affects Versions: 1.10.0 > Reporter: Chen Qin > Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > Time Spent: 10m > Remaining Estimate: 0h > > IntervalJoin is getting lots of usecases in our side. Those use cases shares > following similar pattern > * left stream pulled from slow evolving static dataset periodically > * lookup time range is very large (days weeks) > * right stream is web traffic with high QPS > In current interval join implementation, we treat both streams equal and > ondemand pull / scan keyed state from backend (rocksdb here). When rocksdb > fetch and update gets more expensive, performance took hit to unblock large > use cases. > In proposed implementation, we plan to introduce two changes > * allow user opt-in by inherit from ProcessJoinFunction. > ** if they want to skip trigger scan from left events(static data set) > * on demand build sortedMap from it's otherBuffer for each join key, in our > use cases, it helps > ** expedite right stream lookup of left buffers without access rocksdb > everytime (disk -> sorted memory cache) > ** if a key see event from left side, it cleanup cache and load cache from > right side > ** in worst case scenario, we only see two stream with round robin > processElement1 and processElement2 of same set of keys at same frequency. > Performance is expected to be similar with current implementation, memory > footprint will be bounded by 1/2 state size. > > Open discussion > * how to control cache size? > ** TBD > * how to avoid dirty cache > ** if a given key see insertion from other side, cache will be cleared for > that key and rebuild. This is a small overhead to populate cache, compare > with current rocksdb implemenation, we need do full loop at every event. It > saves on bucket scan logic. > * what happens when checkpoint/restore > ** state still persists in statebackend, clear cache and rebuild of each new > key seen. > -- This message was sent by Atlassian Jira (v8.3.4#803005)