[
https://issues.apache.org/jira/browse/FLINK-8918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kurt Young closed FLINK-8918.
-----------------------------
Resolution: Unresolved
Fix Version/s: (was: 1.8.0)
> Introduce Runtime Filter Join
> -----------------------------
>
> Key: FLINK-8918
> URL: https://issues.apache.org/jira/browse/FLINK-8918
> Project: Flink
> Issue Type: Improvement
> Components: Table API & SQL
> Reporter: Sihua Zhou
> Assignee: Sihua Zhou
> Priority: Major
>
> In general, stream join is one of the most performance cost task. For every
> record from both side, we need to query the state from the other side, this
> will lead to poor performance when the state size if huge. So, in production,
> we always need to spend a lot slots to handle stream join. But, indeed, we
> can improve this in somehow, there a phenomenon of stream join can be found
> in production. That's the `joined ratio` of the stream join is often very
> low, for example.
> - stream join in promotion analysis: Job need to join the promotion log with
> the action(click, view, payment, collection, retweet) log with the
> `promotion_id` to analysis the effect of the promotion.
> - stream join in AD(advertising) attribution: Job need to join the AD click
> log with the item payment log on the `click_id` to find which click of which
> AD that brings the payment to do attribution.
> - stream join in click log analysis of doc: Job need to join viewed log(doc
> viewed by users) with the click log (doc clicked by users) to analysis the
> reason of the click and the property of the users.
> - ….so on
> All these cases have one common property, that is the _joined ratio_ is very
> low. Here is a example to describe it, imagine that, we have 10000 records
> from the left stream, and 10000 records from the right stream, and we execute
> _select * from leftStream l join rightStream r on l.id = r.id_ , we only got
> 100 record from the result, that is the case for low _joined ratio_, this is
> an example for inner join, but it can also apply to left & right join.
> there are more example I can come up with low _joined ratio_ , but the most
> important point I want to expressed is that, the low _joined ratio_ of stream
> join in production is a very common phenomenon(maybe the almost common
> phenomenon in some companies, at least in our company that is the case).
> *Then how to improve it?*
> We can see from the above case, 10000 record join 10000 record we only got
> 100 result, that means, we query the state 20000 times (10000 for the left
> stream and 10000 for the right stream) but only 100 of them are meaningful!!!
> If we could reduce the useless query times, then we can definitely improve
> the performance of stream join.
> the way we used to improve this is to introduce the _Runtime Filter Join_,
> the mainly ideal is that, we build a _filter_ for the state on each side
> (left stream & right stream). When we need to query the state on that side we
> first check the corresponding _filter_ whether the _key_ is possible in the
> state, if the _filter_ say "not, it impossible in the state", then we stop
> querying the state, if it say "hmm, it maybe in state", then we need to query
> the state. As you can see, the best choose of the _filter_ is _Bloom Filter_,
> it has all the feature that we expected: _extremely good performance_,
> _non-existence of false negative_.
>
> *the simplest pseudo code for _Runtime Filter Join_(the comments inline are
> based on RocksDBBackend)*
> {code:java}
> void performJoinNormally(Record recordFromLeftStream) {
> Iterator<Record> rightIterator = rigthStreamState.iterator();
> // perform the `seek()` on the RocksDB, and iterator one by one,
> // this is an expensive operation especially when the key can't be found
> in RocksDB.
> for (Record recordFromRightState : rightIterator) {
> .......
> }
> }
> void performRuntimeFilterJoin(Record recordFromLeftStream) {
> Iterator<Record> rightIterator = EMPTY_ITERATOR;
> if (rigthStreamfilter.containsCurrentKey()) {
> rightIterator = rigthStreamState.iterator();
> }
> // perform the `seek()` only when filter.containsCurrentKey() return true
> for (Record recordFromRightState : rightIterator) {
> .......
> }
>
> // add the current key into the filter of left stream.
> leftStreamFilter.addCurrentKey();
> }
> {code}
> A description of Runtime Filter Join for batch join can be found
> [here|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html]
> (even though it not for stream join original, but we can easily refer it to
> `stream join`)
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)