[
https://issues.apache.org/jira/browse/FLINK-8601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
vinoyang reassigned FLINK-8601:
-------------------------------
Assignee: vinoyang (was: Sihua Zhou)
> Introduce ElasticBloomFilter for Approximate calculation and other situations
> of performance optimization
> ---------------------------------------------------------------------------------------------------------
>
> Key: FLINK-8601
> URL: https://issues.apache.org/jira/browse/FLINK-8601
> Project: Flink
> Issue Type: New Feature
> Components: DataStream API, State Backends, Checkpointing
> Affects Versions: 1.5.0
> Reporter: Sihua Zhou
> Assignee: vinoyang
> Priority: Major
>
> h2. *Motivation*
> There are some scenarios drive us to introduce this ElasticBloomFilter, one
> is Stream Join, another is Data Deduplication, and some special user
> cases...This has given us a great experience, for example, we implemented
> the Runtime Filter Join base on it, and it gives us a great performance
> improvement. With this feature, It diff us from the "normal stream join",
> allows us to improve performance while reducing resource consumption by about
> half!!!
> I will list the two most typical user cases that optimized by the
> ElasticBloomFilter: one is "Runtime Filter Join" in detail, another is "Data
> Dedeplication" in brief.
> *Scenario 1: Runtime Filter Join*
> In general, stream join is one of the most performance cost task. For every
> record from both side, we need to query the state from the other side, this
> will lead to poor performance when the state size if huge. So, in production,
> we always need to spend a lot slots to handle stream join. But, indeed, we
> can improve this in somehow, there a phenomenon of stream join can be found
> in production. That's the “joined ratio” of the stream join is often very
> low, for example.
> - stream join in promotion analysis: Job need to join the promotion log with
> the action(click, view, buy) log with the promotion_id to analysis the effect
> of the promotion.
> - stream join in AD(advertising) attribution: Job need to join the AD click
> log with the item payment log on the click_id to find which click of which AD
> that brings the payment to do attribution.
> - stream join in click log analysis of doc: Job need to join viewed log(doc
> viewed by users) with the click log (doc clicked by users) to analysis the
> reason of the click and the property of the users.
> - ….so on
> All these cases have one common property, that is the joined ratio is very
> low. Here is a example to describe it, we have 10000 records from the left
> stream, and 10000 records from the right stream, and we execute select *
> from leftStream l join rightStream r on l.id = r.id , we only got 100 record
> from the result, that is the case for low joined ratio, this is an example
> for inner join, but it can also apply to left & right join.
> there are more example I can come up with low joined ratio…but the point I
> want to raise up is that the low joined ratio of stream join in production is
> a very common phenomenon(maybe even the almost common phenomenon in some
> companies, at least in our company that is the case).
> *How to improve this?*
> We can see from the above case, 10000 record join 10000 record and we only
> got 100 result, that means, we query the state 20000 times (10000 for the
> left stream and 10000 for the right stream) but only 100 of them are
> meaningful!!! If we could reduce the useless query times, then we can
> definitely improve the performance of stream join.
> the way we used to improve this is to introduce the Runtime Filter Join, the
> mainly ideal is that, we build a filter for the state on each side (left
> stream & right stream). When we need to query the state on that side we first
> check the corresponding filter whether the key is possible in the state, if
> the filter say "not, it impossible in the State", then we stop querying the
> state, if it say "hmm, it maybe in state", then we need to query the state.
> As you can see, the best choose of the filter is Bloom Filter, it has all the
> feature that we want: extremely good performance, non-existence of false
> negative.
> The simplest pseudo code for Runtime Filter Join(the comments is based on
> RocksDBBackend)
> {code:java}
> void performJoinNormally(Record recordFromLeftStream) {
> Iterator<Record> rightIterator = rigthStreamState.iterator();
> // perform the `seek()` on the RocksDB, and iterator one by one,
> // this is an expensive operation especially when the key can't be
> found in RocksDB.
> for (Record recordFromRightState : rightIterator) {
> ……...
> }
> }
>
> void performRuntimeFilterJoin(Record recordFromLeftStream) {
> Iterator<Record> rightIterator = EMPTY_ITERATOR;
> if (rigthStreamfilter.containsCurrentKey()) {
> rightIterator = rigthStreamState.iterator();
> }
> // perform the `seek()` only when filter.containsCurrentKey() return true
> for (Record recordFromRightState : rightIterator) {
> .......
> }
> // add the current key into the filter of left stream.
> leftStreamFilter.addCurrentKey();
> }
> {code}
> *Scenario 2: Data Deduplication*
> We have implemented two general functions based on the ElasticBloomFilter.
> They are count(distinct x) and select distinct x, y, z from table. Unlike the
> Runtime Filter Join the result of this two functions is approximate, not
> exactly. There are used in the scenario where we don't need a 100% accurate
> result, for example, to count the number of visiting users in each online
> store. In general, we don't need a 100% accurate result in this case(indeed
> we can't give a 100% accurate result, because there could be error when
> collecting user_id from different devices), if we could get a 98% accurate
> result with only 1/2 resource, that could be very nice.
> {code:java}
> void countDistinctNormally(Key key, Iterator<Record> records) {
> // query 1 times
> final long oldVal = valState.get();
> long val = oldVal;
> // query records.size() times
> for (Record record : records) {
> if (mapState.get(record) == null) {
> ++val;
> mapState.put(record);
> }
> }
> if (val != oldVal) {
> valState.update(val);
> }
> }
>
> void countDistinctBF(Key key, Iterator<Record> records) {
> // query 1 times
> final long oldVal = valState.get();
> long val = oldVal;
> for (Record record : records) {
> if (!bfState.contains(record)) {
> ++val;
> bfState.add(record);
> }
> }
> if (val != oldVal) {
> valState.update(val);
> }
> }
> {code}
> I believe there would be more user cases in stream world that could be
> optimized by the Bloom Filter(as what it had done in the big data world)...
> *Required features and challenges*
> There are a few challenges with using bloom filter in flink. Firstly, it need
> to be held as operator state because it need to support 1) fault-tolerant,
> and as well as 2) rescaling. Beside, because we need to support rescaling, so
> we need to create bloom filter for each key group to store data fails into
> it, so another challenge is how to 3) handle data skewed(The amount of data
> that falls into different groups could be very different )? Imagine that we
> create a BF on each key group for the incoming data, and we are able to
> estimate the total amount of data, then the question is what the size should
> we create for the BF that on each key group? It is so tricky and even
> impossible to estimate the amount of data on each key group. After that,
> because that Bloom Filter need to live in the memory to get the extremely
> performance, so we need a 4) TTL policy to recycle memory, otherwise we will
> get OOM finally. So, as a brief summarize we need to at lest fullfill the
> follow features:
> - Fault tolerant(checkpoint & restoring)
> - Rescaling
> - Handle data skewed
> - TTL policy
> Design doc: [design
> doc|https://docs.google.com/document/d/17UY5RZ1mq--hPzFx-LfBjCAw_kkoIrI9KHovXWkxNYY/edit?usp=sharing]
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)