[ https://issues.apache.org/jira/browse/FLINK-16392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chen Qin updated FLINK-16392: ----------------------------- Description: IntervalJoin is getting lots of usecases in our side. Those use cases shares following similar pattern * left stream pulled from slow evolving static dataset periodically * lookup time range is very large (days weeks) * right stream is web traffic with high QPS In current interval join implementation, we treat both streams equal and ondemand pull / scan keyed state from backend (rocksdb here). When rocksdb fetch and update gets more expensive, performance took hit to unblock large use cases. In proposed implementation, we plan to introduce two changes * allow user opt-in use cases like above by inherit from ProcessJoinFunction. ** whether skip trigger scan from left events(static data set) ** allow set earlier clean up right stream earlier than interval upper-bound * leverage ram cache on demand build sortedMap from it's otherBuffer for each join key, in our use cases, it helps ** expedite right stream lookup of left buffers without access rocksdb everytime (disk -> sorted memory cache) ** if a key see event from left side, it cleanup cache and load cache from right side ** in worst case scenario, we only see two stream with round robin processElement1 and processElement2 of same set of keys at same frequency. Performance is expected to be similar with current implementation, memory footprint will be bounded by 1/2 state size. Open discussion * how to control cache size? ** by default cache size is set to 1 key * how to avoid dirty cache ** if a given key see insertion from other side, cache will be cleared for that key and rebuild. * what happens when checkpoint/restore ** state still persists in statebackend, clear cache and rebuild of each new key seen. * how is performance ** Given assumption ram is magnitude faster than ssd and lot more to spin disk, this is a small overhead (1% - 5%) to populate cache, compare with current rocksdb implemenation, we need do full loop at every event. It saves on bucket scan logic. If key recurring more than 1 access in same direction on cache, we expect significant perf gain. was: IntervalJoin is getting lots of usecases in our side. Those use cases shares following similar pattern * left stream pulled from slow evolving static dataset periodically * lookup time range is very large (days weeks) * right stream is web traffic with high QPS In current interval join implementation, we treat both streams equal and ondemand pull / scan keyed state from backend (rocksdb here). When rocksdb fetch and update gets more expensive, performance took hit to unblock large use cases. In proposed implementation, we plan to introduce two changes * allow user opt-in use cases like above by inherit from ProcessJoinFunction. ** whether skip trigger scan from left events(static data set) ** allow set earlier clean up right stream earlier than interval upper-bound * leverage ram cache on demand build sortedMap from it's otherBuffer for each join key, in our use cases, it helps ** expedite right stream lookup of left buffers without access rocksdb everytime (disk -> sorted memory cache) ** if a key see event from left side, it cleanup cache and load cache from right side ** in worst case scenario, we only see two stream with round robin processElement1 and processElement2 of same set of keys at same frequency. Performance is expected to be similar with current implementation, memory footprint will be bounded by 1/2 state size. Open discussion * how to control cache size? ** by default cache size is set to 1 key * how to avoid dirty cache ** if a given key see insertion from other side, cache will be cleared for that key and rebuild. * what happens when checkpoint/restore ** state still persists in statebackend, clear cache and rebuild of each new key seen. * how is performance ** Given assumption ram is magnitude faster than ram, this is a small overhead (<5%) to populate cache, compare with current rocksdb implemenation, we need do full loop at every event. It saves on bucket scan logic. If key recurring more than 1 access in same direction on cache, we expect significant perf gain. > oneside sorted cache in intervaljoin > ------------------------------------ > > Key: FLINK-16392 > URL: https://issues.apache.org/jira/browse/FLINK-16392 > Project: Flink > Issue Type: Improvement > Components: API / DataStream > Affects Versions: 1.10.0 > Reporter: Chen Qin > Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > Time Spent: 10m > Remaining Estimate: 0h > > IntervalJoin is getting lots of usecases in our side. Those use cases shares > following similar pattern > * left stream pulled from slow evolving static dataset periodically > * lookup time range is very large (days weeks) > * right stream is web traffic with high QPS > In current interval join implementation, we treat both streams equal and > ondemand pull / scan keyed state from backend (rocksdb here). When rocksdb > fetch and update gets more expensive, performance took hit to unblock large > use cases. > In proposed implementation, we plan to introduce two changes > * allow user opt-in use cases like above by inherit from ProcessJoinFunction. > ** whether skip trigger scan from left events(static data set) > ** allow set earlier clean up right stream earlier than interval upper-bound > * leverage ram cache on demand build sortedMap from it's otherBuffer for > each join key, in our use cases, it helps > ** expedite right stream lookup of left buffers without access rocksdb > everytime (disk -> sorted memory cache) > ** if a key see event from left side, it cleanup cache and load cache from > right side > ** in worst case scenario, we only see two stream with round robin > processElement1 and processElement2 of same set of keys at same frequency. > Performance is expected to be similar with current implementation, memory > footprint will be bounded by 1/2 state size. > > Open discussion > * how to control cache size? > ** by default cache size is set to 1 key > * how to avoid dirty cache > ** if a given key see insertion from other side, cache will be cleared for > that key and rebuild. > * what happens when checkpoint/restore > ** state still persists in statebackend, clear cache and rebuild of each new > key seen. > * how is performance > ** Given assumption ram is magnitude faster than ssd and lot more to spin > disk, this is a small overhead (1% - 5%) to populate cache, compare with > current rocksdb implemenation, we need do full loop at every event. It saves > on bucket scan logic. If key recurring more than 1 access in same direction > on cache, we expect significant perf gain. > -- This message was sent by Atlassian Jira (v8.3.4#803005)