[ https://issues.apache.org/jira/browse/FLINK-16392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chen Qin updated FLINK-16392: ----------------------------- Description: Native intervaljoin rely on statebackend(e.g rocksdb) to insert/fetch left and right buffer. This design choice reduce minimize heap memory footprint while bounded process throughput of single taskmanager iops to rocksdb access speed. Here at Pinterest, we have some large use cases where developers join large and slow evolving data stream (e.g post updates in last 28 days) with web traffic datastream (e.g post views up to 28 days after given update). This post some challenge to current implementation of intervaljoin * partitioned rocksdb needs to keep both updates and views for 28 days, large buffer(especially view stream side) cause rocksdb slow down and lead to overall interval join performance degregate quickly as state build up. * view stream is web scale, even after setting large parallelism it can put lot of pressure on each subtask and backpressure entire job In proposed implementation, we plan to introduce two changes * support ProcessJoinFunction settings to opt-in earlier cleanup time of right stream(e.g view stream don't have to stay in buffer for 28 days and wait for update stream to join, related post views happens after update in event time semantic) This optimization can reduce state size to improve rocksdb throughput. If extreme case, user can opt-in in flight join and skip write into right view stream buffer to save iops budget on each subtask * support ProcessJoinFunction settings to expedite keyed lookup of slow changing stream. Instead of every post view pull post updates from rocksdb. user can opt-in and having one side buffer cache available in memory. If a given post update, cache load recent views from right buffer and use sortedMap to find buckets. If a given post view, cache load recent updates from left buffer to memory. When another view for that post arrives, flink save cost of rocksdb access. was: IntervalJoin is getting lots of usecases in our side. Those use cases shares following similar pattern * left stream pulled from low QPS source * from right stream to left stream lookup time range is very large (days weeks) * right stream is web traffic with high QPS In current interval join implementation, we treat both streams equal and ondemand pull / scan keyed state from backend (rocksdb here). When rocksdb fetch and update gets more expensive, performance took hit to unblock large use cases. In proposed implementation, we plan to introduce two changes * allow user opt-in more aggresive right buffer cleanup ** allow overwrite earlier clean up right stream earlier than interval upper-bound * leverage ram cache on demand build sortedMap from it's otherBuffer for each join key, in our use cases, it helps ** expedite right stream lookup of left buffers without access rocksdb everytime (disk -> sorted memory cache) ** if a key see event from left side, it cleanup cache and load cache from right side ** in worst case scenario, we only see two stream with round robin processElement1 and processElement2 of same set of keys at same frequency. Performance is expected to be similar with current implementation, memory footprint will be bounded by 1/2 state size. Open discussion * how to control cache size? ** by default cache size is set to 1 key * how to avoid dirty cache ** if a given key see insertion from other side, cache will be cleared for that key and rebuild. * what happens when checkpoint/restore ** state still persists in statebackend, clear cache and rebuild of each new key seen. * how is performance ** Given assumption ram is magnitude faster than ram, this is a small overhead (<5%) to populate cache, compare with current rocksdb implemenation, we need do full loop at every event. It saves on bucket scan logic. If key recurring more than 1 access in same direction on cache, we expect significant perf gain. > oneside sorted cache in intervaljoin > ------------------------------------ > > Key: FLINK-16392 > URL: https://issues.apache.org/jira/browse/FLINK-16392 > Project: Flink > Issue Type: Improvement > Components: API / DataStream > Affects Versions: 1.10.0 > Reporter: Chen Qin > Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Native intervaljoin rely on statebackend(e.g rocksdb) to insert/fetch left > and right buffer. This design choice reduce minimize heap memory footprint > while bounded process throughput of single taskmanager iops to rocksdb access > speed. Here at Pinterest, we have some large use cases where developers join > large and slow evolving data stream (e.g post updates in last 28 days) with > web traffic datastream (e.g post views up to 28 days after given update). > This post some challenge to current implementation of intervaljoin > * partitioned rocksdb needs to keep both updates and views for 28 days, > large buffer(especially view stream side) cause rocksdb slow down and lead to > overall interval join performance degregate quickly as state build up. > * view stream is web scale, even after setting large parallelism it can put > lot of pressure on each subtask and backpressure entire job > In proposed implementation, we plan to introduce two changes > * support ProcessJoinFunction settings to opt-in earlier cleanup time of > right stream(e.g view stream don't have to stay in buffer for 28 days and > wait for update stream to join, related post views happens after update in > event time semantic) This optimization can reduce state size to improve > rocksdb throughput. If extreme case, user can opt-in in flight join and skip > write into right view stream buffer to save iops budget on each subtask > * support ProcessJoinFunction settings to expedite keyed lookup of slow > changing stream. Instead of every post view pull post updates from rocksdb. > user can opt-in and having one side buffer cache available in memory. If a > given post update, cache load recent views from right buffer and use > sortedMap to find buckets. If a given post view, cache load recent updates > from left buffer to memory. When another view for that post arrives, flink > save cost of rocksdb access. -- This message was sent by Atlassian Jira (v8.3.4#803005)