[ 
https://issues.apache.org/jira/browse/FLINK-8918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-8918:
---------------------------------
    Fix Version/s: 1.8.0

> Introduce Runtime Filter Join
> -----------------------------
>
>                 Key: FLINK-8918
>                 URL: https://issues.apache.org/jira/browse/FLINK-8918
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API & SQL
>            Reporter: Sihua Zhou
>            Assignee: Sihua Zhou
>            Priority: Major
>             Fix For: 1.7.0, 1.8.0
>
>
> In general, stream join is one of the most performance cost task. For every 
> record from both side, we need to query the state from the other side, this 
> will lead to poor performance when the state size if huge. So, in production, 
> we always need to spend a lot slots to handle stream join. But, indeed, we 
> can improve this in somehow, there a phenomenon of stream join can be found 
> in production. That's the `joined ratio` of the stream join is often very 
> low, for example.
>  - stream join in promotion analysis: Job need to join the promotion log with 
> the action(click, view, payment, collection, retweet) log with the 
> `promotion_id` to analysis the effect of the promotion.
>  - stream join in AD(advertising) attribution: Job need to join the AD click 
> log with the item payment log on the `click_id` to find which click of which 
> AD that brings the payment to do attribution.
>  - stream join in click log analysis of doc: Job need to join viewed log(doc 
> viewed by users) with the click log (doc clicked by users) to analysis the 
> reason of the click and the property of the users.
>  - ….so on
> All these cases have one common property, that is the _joined ratio_ is very 
> low. Here is a example to describe it, imagine that, we have 10000 records 
> from the left stream, and 10000 records from the right stream, and we execute 
> _select * from leftStream l join rightStream r on l.id = r.id_ , we only got 
> 100 record from the result, that is the case for low _joined ratio_, this is 
> an example for inner join, but it can also apply to left & right join.
> there are more example I can come up with low _joined ratio_ , but the most 
> important point I want to expressed is that, the low _joined ratio_ of stream 
> join in production is a very common phenomenon(maybe the almost common 
> phenomenon in some companies, at least in our company that is the case).
> *Then how to improve it?*
> We can see from the above case, 10000 record join 10000 record we only got 
> 100 result, that means, we query the state 20000 times (10000 for the left 
> stream and 10000 for the right stream) but only 100 of them are meaningful!!! 
> If we could reduce the useless query times, then we can definitely improve 
> the performance of stream join.
> the way we used to improve this is to introduce the _Runtime Filter Join_, 
> the mainly ideal is that, we build a _filter_ for the state on each side 
> (left stream & right stream). When we need to query the state on that side we 
> first check the corresponding _filter_ whether the _key_ is possible in the 
> state, if the _filter_ say "not, it impossible in the state", then we stop 
> querying the state, if it say "hmm, it maybe in state", then we need to query 
> the state. As you can see, the best choose of the _filter_ is _Bloom Filter_, 
> it has all the feature that we expected: _extremely good performance_, 
> _non-existence of false negative_.
>  
> *the simplest pseudo code for _Runtime Filter Join_(the comments inline are 
> based on RocksDBBackend)*
> {code:java}
> void performJoinNormally(Record recordFromLeftStream) {
>     Iterator<Record> rightIterator = rigthStreamState.iterator();
>     // perform the `seek()` on the RocksDB, and iterator one by one,
>     // this is an expensive operation especially when the key can't be found 
> in RocksDB.
>     for (Record recordFromRightState : rightIterator) {
>         .......
>     }
> }
> void performRuntimeFilterJoin(Record recordFromLeftStream) {
>     Iterator<Record> rightIterator = EMPTY_ITERATOR;
>     if (rigthStreamfilter.containsCurrentKey()) {
>         rightIterator = rigthStreamState.iterator();
>     }
>     // perform the `seek()` only when filter.containsCurrentKey() return true
>     for (Record recordFromRightState : rightIterator) {
>         .......
>     }
>     
>     // add the current key into the filter of left stream.
>       leftStreamFilter.addCurrentKey();
> }
> {code}
> A description of Runtime Filter Join for batch join can be found 
> [here|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html]
>  (even though it not for stream join original, but we can easily refer it to 
> `stream join`)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to