[ 
https://issues.apache.org/jira/browse/FLINK-8601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-8601:
------------------------------
    Description: 
h2. *Motivation*

There are some scenarios drive us to introduce this ElasticBloomFilter, one is 
Stream Join, another is Data Deduplication, and some special user cases...This 
has given us a great experience, for example,  we implemented the Runtime 
Filter Join base on it, and it gives us a great performance improvement. With 
this feature, It diff us from the "normal stream join", allows us to improve 
performance while reducing resource consumption by about half!!!
I will list the two most typical user cases that optimized by the 
ElasticBloomFilter: one is "Runtime Filter Join" in detail, another is "Data 
Dedeplication" in brief.

*Scenario 1: Runtime Filter Join*

In general, stream join is one of the most performance cost task. For every 
record from both side, we need to query the state from the other side, this 
will lead to poor performance when the state size if huge. So, in production, 
we always need to spend a lot slots to handle stream join. But, indeed, we can 
improve this in somehow, there a phenomenon of stream join can be found in 
production. That's the “joined ratio” of the stream join is often very low, for 
example.
- stream join in promotion analysis: Job need to join the promotion log with 
the action(click, view, buy) log with the promotion_id to analysis the effect 
of the promotion.
- stream join in AD(advertising) attribution: Job need to join the AD click log 
with the item payment log on the click_id to find which click of which AD that 
brings the payment to do attribution.
- stream join in click log analysis of doc: Job need to join viewed log(doc 
viewed by users) with the click log (doc clicked by users) to analysis the 
reason of the click and the property of the users.
- ….so on

All these cases have one common property, that is the joined ratio is very low. 
Here is a example to describe it, we have 10000 records from the left stream, 
and 10000 records from the right stream, and we execute  select * from 
leftStream l join rightStream r on l.id = r.id , we only got 100 record from 
the result, that is the case for low joined ratio, this is an example for inner 
join, but it can also apply to left & right join.
there are more example I can come up with low joined ratio…but the point I want 
to raise up is that the low joined ratio of stream join in production is a very 
common phenomenon(maybe even the almost common phenomenon in some companies, at 
least in our company that is the case).

*How to improve this?*

We can see from the above case, 10000 record join 10000 record and we only got 
100 result, that means, we query the state 20000 times (10000 for the left 
stream and 10000 for the right stream) but only 100 of them are meaningful!!! 
If we could reduce the useless query times, then we can definitely improve the 
performance of stream join.
the way we used to improve this is to introduce the Runtime Filter Join, the 
mainly ideal is that, we build a filter for the state on each side (left stream 
& right stream). When we need to query the state on that side we first check 
the corresponding filter whether the key is possible in the state, if the 
filter say "not, it impossible in the State", then we stop querying the state, 
if it say "hmm, it maybe in state", then we need to query the state. As you can 
see, the best choose of the filter is Bloom Filter, it has all the feature that 
we want: extremely good performance, non-existence of false negative.

The simplest pseudo code for Runtime Filter Join(the comments is based on 
RocksDBBackend)
{code:java}
void performJoinNormally(Record recordFromLeftStream) {
        Iterator<Record> rightIterator = rigthStreamState.iterator();
        // perform the `seek()` on the RocksDB, and iterator one by one,
        // this is an expensive operation especially when the key can't be 
found in RocksDB.
for (Record recordFromRightState : rightIterator) {
        ……...
}
}
 
void performRuntimeFilterJoin(Record recordFromLeftStream) {
        Iterator<Record> rightIterator = EMPTY_ITERATOR;
        if (rigthStreamfilter.containsCurrentKey()) {
                rightIterator = rigthStreamState.iterator();
        }
 // perform the `seek()` only when filter.containsCurrentKey() return true
        for (Record recordFromRightState : rightIterator) {
                .......
        }
         // add the current key into the filter of left stream.
        leftStreamFilter.addCurrentKey();
}
{code}

*Scenario 2:  Data Deduplication*

We have implemented two general functions based on the ElasticBloomFilter. They 
are count(distinct x) and select distinct x, y, z from table. Unlike the 
Runtime Filter Join the result of this two functions is approximate, not 
exactly. There are used in the scenario where we don't need a 100% accurate 
result, for example, to count the number of visiting users in each online 
store. In general, we don't need a 100% accurate result in this case(indeed we 
can't give a 100% accurate result, because there could be error when collecting 
user_id from different devices), if we could get a 98% accurate result with 
only 1/2 resource, that could be very nice.
{code:java}
void countDistinctNormally(Key key, Iterator<Record> records) {
        // query 1 times
        final long oldVal = valState.get();
        long val = oldVal;
        // query records.size() times
        for (Record record : records) {
                if (mapState.get(record) == null) {
                                ++val;
                                mapState.put(record);
                }
        }
        if (val != oldVal) {
                valState.update(val);
        }
}
 
void countDistinctBF(Key key, Iterator<Record> records) {
        // query 1 times
        final long oldVal = valState.get();
        long val = oldVal;
        for (Record record : records) {
                if (!bfState.contains(record)) {
                                ++val;
                                bfState.add(record);
                }
        }
        if (val != oldVal) {
                valState.update(val);
        }
}
{code}

I believe there would be more user cases in stream world that could be 
optimized by the Bloom Filter(as what it had done in the big data world)...

*Required features and challenges*

There are a few challenges with using bloom filter in flink. Firstly, it need 
to be held as operator state because it need to support 1) fault-tolerant, and 
as well as 2) rescaling. Beside, because we need to support rescaling, so we 
need to create bloom filter for each key group to store data fails into it, so 
another challenge is how to 3) handle data skewed(The amount of data that falls 
into different groups could be very different )? Imagine that we create a BF on 
each key group for the incoming data, and we are able to estimate the total 
amount of data, then the question is what the size should we create for the BF 
that on each key group? It is so tricky and even impossible to estimate the 
amount of data on each key group. After that, because that Bloom Filter need to 
live in the memory to get the extremely performance, so we need a 4) TTL policy 
to recycle memory, otherwise we will get OOM finally. So, as a brief summarize 
we need to at lest fullfill the follow features:

- Fault tolerant(checkpoint & restoring)
- Rescaling
- Handle data skewed
- TTL policy

Design doc:  [design 
doc|https://docs.google.com/document/d/17UY5RZ1mq--hPzFx-LfBjCAw_kkoIrI9KHovXWkxNYY/edit?usp=sharing]

  was:

h2. *Motivation*

There are some scenarios drive us to introduce this ElasticBloomFilter, one is 
Stream Join, another is Data Deduplication, and some special user cases...This 
has given us a great experience, for example,  we implemented the Runtime 
Filter Join base on it, and it gives us a great performance improvement. With 
this feature, It diff us from the "normal stream join", allows us to improve 
performance while reducing resource consumption by about half!!!
I will list the two most typical user cases that optimized by the 
ElasticBloomFilter: one is "Runtime Filter Join" in detail, another is "Data 
Dedeplication" in brief.

*Scenario 1: Runtime Filter Join*

In general, stream join is one of the most performance cost task. For every 
record from both side, we need to query the state from the other side, this 
will lead to poor performance when the state size if huge. So, in production, 
we always need to spend a lot slots to handle stream join. But, indeed, we can 
improve this in somehow, there a phenomenon of stream join can be found in 
production. That's the “joined ratio” of the stream join is often very low, for 
example.
- stream join in promotion analysis: Job need to join the promotion log with 
the action(click, view, buy) log with the promotion_id to analysis the effect 
of the promotion.
- stream join in AD(advertising) attribution: Job need to join the AD click log 
with the item payment log on the click_id to find which click of which AD that 
brings the payment to do attribution.
- stream join in click log analysis of doc: Job need to join viewed log(doc 
viewed by users) with the click log (doc clicked by users) to analysis the 
reason of the click and the property of the users.
- ….so on

All these cases have one common property, that is the joined ratio is very low. 
Here is a example to describe it, we have 10000 records from the left stream, 
and 10000 records from the right stream, and we execute  select * from 
leftStream l join rightStream r on l.id = r.id , we only got 100 record from 
the result, that is the case for low joined ratio, this is an example for inner 
join, but it can also apply to left & right join.
there are more example I can come up with low joined ratio…but the point I want 
to raise up is that the low joined ratio of stream join in production is a very 
common phenomenon(maybe even the almost common phenomenon in some companies, at 
least in our company that is the case).

*How to improve this?*

We can see from the above case, 10000 record join 10000 record and we only got 
100 result, that means, we query the state 20000 times (10000 for the left 
stream and 10000 for the right stream) but only 100 of them are meaningful!!! 
If we could reduce the useless query times, then we can definitely improve the 
performance of stream join.
the way we used to improve this is to introduce the Runtime Filter Join, the 
mainly ideal is that, we build a filter for the state on each side (left stream 
& right stream). When we need to query the state on that side we first check 
the corresponding filter whether the key is possible in the state, if the 
filter say "not, it impossible in the State", then we stop querying the state, 
if it say "hmm, it maybe in state", then we need to query the state. As you can 
see, the best choose of the filter is Bloom Filter, it has all the feature that 
we want: extremely good performance, non-existence of false negative.

The simplest pseudo code for Runtime Filter Join(the comments is based on 
RocksDBBackend)
{code:java}
void performJoinNormally(Record recordFromLeftStream) {
        Iterator<Record> rightIterator = rigthStreamState.iterator();
        // perform the `seek()` on the RocksDB, and iterator one by one,
        // this is an expensive operation especially when the key can't be 
found in RocksDB.
for (Record recordFromRightState : rightIterator) {
        ……...
}
}
 
void performRuntimeFilterJoin(Record recordFromLeftStream) {
        Iterator<Record> rightIterator = EMPTY_ITERATOR;
        if (rigthStreamfilter.containsCurrentKey()) {
                rightIterator = rigthStreamState.iterator();
        }
 // perform the `seek()` only when filter.containsCurrentKey() return true
        for (Record recordFromRightState : rightIterator) {
                .......
        }
         // add the current key into the filter of left stream.
        leftStreamFilter.addCurrentKey();
}
{code}

*Scenario 2:  Data Deduplication*

We have implemented two general functions based on the ElasticBloomFilter. They 
are count(distinct x) and select distinct x, y, z from table. Unlike the 
Runtime Filter Join the result of this two functions is approximate, not 
exactly. There are used in the scenario where we don't need a 100% accurate 
result, for example, to count the number of visiting users in each online 
store. In general, we don't need a 100% accurate result in this case(indeed we 
can't give a 100% accurate result, because there could be error when collecting 
user_id from different devices), if we could get a 98% accurate result with 
only 1/2 resource, that could be very nice.
{code:java}
void countDistinctNormally(Key key, Iterator<Record> records) {
        // query 1 times
        final long oldVal = valState.get();
        long val = oldVal;
        // query records.size() times
        for (Record record : records) {
                if (mapState.get(record) == null) {
                                ++val;
                                mapState.put(record);
                }
        }
        if (val != oldVal) {
                valState.update(val);
        }
}
 
void countDistinctBF(Key key, Iterator<Record> records) {
        // query 1 times
        final long oldVal = valState.get();
        long val = oldVal;
        for (Record record : records) {
                if (!bfState.contains(record)) {
                                ++val;
                                bfState.add(record);
                }
        }
        if (val != oldVal) {
                valState.update(val);
        }
}
{code}

I believe there would be more user cases in stream world that could be 
optimized by the Bloom Filter(as what it had done in the big data world)...

*Required features and challenges*

There are a few challenges with using bloom filter in flink. Firstly, it need 
to be held as operator state because it need to support 1) fault-tolerant, and 
as well as 2) rescaling. Beside, because we need to support rescaling, so we 
need to create bloom filter for each key group to store data fails into it, so 
another challenge is how to 3) handle data skewed(The amount of data that falls 
into different groups could be very different )? Imagine that we create a BF on 
each key group for the incoming data, and we are able to estimate the total 
amount of data, then the question is what the size should we create for the BF 
that on each key group? It is so tricky and even impossible to estimate the 
amount of data on each key group. After that, because that Bloom Filter need to 
live in the memory to get the extremely performance, so we need a 4) TTL policy 
to recycle memory, otherwise we will get OOM finally. So, as a brief summarize 
we need to at lest fullfill the follow features:

- Fault tolerant(checkpoint & restoring)
- Rescaling
- Handle data skewed
- TTL policy

Design doc:  [design 
doc|https://docs.google.com/document/d/17UY5RZ1mq--hPzFx-LfBjCAw_kkoIrI9KHovXWkxNYY/edit?usp=sharing]


> Introduce ElasticBloomFilter for Approximate calculation and other situations 
> of performance optimization
> ---------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-8601
>                 URL: https://issues.apache.org/jira/browse/FLINK-8601
>             Project: Flink
>          Issue Type: New Feature
>          Components: DataStream API, State Backends, Checkpointing
>    Affects Versions: 1.5.0
>            Reporter: Sihua Zhou
>            Assignee: Sihua Zhou
>            Priority: Major
>
> h2. *Motivation*
> There are some scenarios drive us to introduce this ElasticBloomFilter, one 
> is Stream Join, another is Data Deduplication, and some special user 
> cases...This has given us a great experience, for example,  we implemented 
> the Runtime Filter Join base on it, and it gives us a great performance 
> improvement. With this feature, It diff us from the "normal stream join", 
> allows us to improve performance while reducing resource consumption by about 
> half!!!
> I will list the two most typical user cases that optimized by the 
> ElasticBloomFilter: one is "Runtime Filter Join" in detail, another is "Data 
> Dedeplication" in brief.
> *Scenario 1: Runtime Filter Join*
> In general, stream join is one of the most performance cost task. For every 
> record from both side, we need to query the state from the other side, this 
> will lead to poor performance when the state size if huge. So, in production, 
> we always need to spend a lot slots to handle stream join. But, indeed, we 
> can improve this in somehow, there a phenomenon of stream join can be found 
> in production. That's the “joined ratio” of the stream join is often very 
> low, for example.
> - stream join in promotion analysis: Job need to join the promotion log with 
> the action(click, view, buy) log with the promotion_id to analysis the effect 
> of the promotion.
> - stream join in AD(advertising) attribution: Job need to join the AD click 
> log with the item payment log on the click_id to find which click of which AD 
> that brings the payment to do attribution.
> - stream join in click log analysis of doc: Job need to join viewed log(doc 
> viewed by users) with the click log (doc clicked by users) to analysis the 
> reason of the click and the property of the users.
> - ….so on
> All these cases have one common property, that is the joined ratio is very 
> low. Here is a example to describe it, we have 10000 records from the left 
> stream, and 10000 records from the right stream, and we execute  select * 
> from leftStream l join rightStream r on l.id = r.id , we only got 100 record 
> from the result, that is the case for low joined ratio, this is an example 
> for inner join, but it can also apply to left & right join.
> there are more example I can come up with low joined ratio…but the point I 
> want to raise up is that the low joined ratio of stream join in production is 
> a very common phenomenon(maybe even the almost common phenomenon in some 
> companies, at least in our company that is the case).
> *How to improve this?*
> We can see from the above case, 10000 record join 10000 record and we only 
> got 100 result, that means, we query the state 20000 times (10000 for the 
> left stream and 10000 for the right stream) but only 100 of them are 
> meaningful!!! If we could reduce the useless query times, then we can 
> definitely improve the performance of stream join.
> the way we used to improve this is to introduce the Runtime Filter Join, the 
> mainly ideal is that, we build a filter for the state on each side (left 
> stream & right stream). When we need to query the state on that side we first 
> check the corresponding filter whether the key is possible in the state, if 
> the filter say "not, it impossible in the State", then we stop querying the 
> state, if it say "hmm, it maybe in state", then we need to query the state. 
> As you can see, the best choose of the filter is Bloom Filter, it has all the 
> feature that we want: extremely good performance, non-existence of false 
> negative.
> The simplest pseudo code for Runtime Filter Join(the comments is based on 
> RocksDBBackend)
> {code:java}
> void performJoinNormally(Record recordFromLeftStream) {
>       Iterator<Record> rightIterator = rigthStreamState.iterator();
>       // perform the `seek()` on the RocksDB, and iterator one by one,
>       // this is an expensive operation especially when the key can't be 
> found in RocksDB.
> for (Record recordFromRightState : rightIterator) {
>       ……...
> }
> }
>  
> void performRuntimeFilterJoin(Record recordFromLeftStream) {
>       Iterator<Record> rightIterator = EMPTY_ITERATOR;
>       if (rigthStreamfilter.containsCurrentKey()) {
>                       rightIterator = rigthStreamState.iterator();
>       }
>  // perform the `seek()` only when filter.containsCurrentKey() return true
>       for (Record recordFromRightState : rightIterator) {
>               .......
>       }
>        // add the current key into the filter of left stream.
>       leftStreamFilter.addCurrentKey();
> }
> {code}
> *Scenario 2:  Data Deduplication*
> We have implemented two general functions based on the ElasticBloomFilter. 
> They are count(distinct x) and select distinct x, y, z from table. Unlike the 
> Runtime Filter Join the result of this two functions is approximate, not 
> exactly. There are used in the scenario where we don't need a 100% accurate 
> result, for example, to count the number of visiting users in each online 
> store. In general, we don't need a 100% accurate result in this case(indeed 
> we can't give a 100% accurate result, because there could be error when 
> collecting user_id from different devices), if we could get a 98% accurate 
> result with only 1/2 resource, that could be very nice.
> {code:java}
> void countDistinctNormally(Key key, Iterator<Record> records) {
>       // query 1 times
>       final long oldVal = valState.get();
>       long val = oldVal;
>               // query records.size() times
>       for (Record record : records) {
>                       if (mapState.get(record) == null) {
>                               ++val;
>                               mapState.put(record);
>                       }
>       }
>       if (val != oldVal) {
>                       valState.update(val);
>       }
> }
>  
> void countDistinctBF(Key key, Iterator<Record> records) {
>       // query 1 times
>       final long oldVal = valState.get();
>       long val = oldVal;
>       for (Record record : records) {
>                       if (!bfState.contains(record)) {
>                               ++val;
>                               bfState.add(record);
>                       }
>       }
>       if (val != oldVal) {
>                       valState.update(val);
>       }
> }
> {code}
> I believe there would be more user cases in stream world that could be 
> optimized by the Bloom Filter(as what it had done in the big data world)...
> *Required features and challenges*
> There are a few challenges with using bloom filter in flink. Firstly, it need 
> to be held as operator state because it need to support 1) fault-tolerant, 
> and as well as 2) rescaling. Beside, because we need to support rescaling, so 
> we need to create bloom filter for each key group to store data fails into 
> it, so another challenge is how to 3) handle data skewed(The amount of data 
> that falls into different groups could be very different )? Imagine that we 
> create a BF on each key group for the incoming data, and we are able to 
> estimate the total amount of data, then the question is what the size should 
> we create for the BF that on each key group? It is so tricky and even 
> impossible to estimate the amount of data on each key group. After that, 
> because that Bloom Filter need to live in the memory to get the extremely 
> performance, so we need a 4) TTL policy to recycle memory, otherwise we will 
> get OOM finally. So, as a brief summarize we need to at lest fullfill the 
> follow features:
> - Fault tolerant(checkpoint & restoring)
> - Rescaling
> - Handle data skewed
> - TTL policy
> Design doc:  [design 
> doc|https://docs.google.com/document/d/17UY5RZ1mq--hPzFx-LfBjCAw_kkoIrI9KHovXWkxNYY/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to