[ 
https://issues.apache.org/jira/browse/FLINK-2907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14976126#comment-14976126
 ] 

Fabian Hueske commented on FLINK-2907:
--------------------------------------

Hi Greg,

this is a really interesting idea. Being able to filter out data before the 
shuffle can save a lot of resources and time.
I guess, you saw the bloom filter implementation in the current hash join. This 
is of course only a local bloom filter that helps to reduce the number of probe 
side records that are spilled to disk if the hash table does not fit into the 
available memory. 

I see mainly two questions for this issue:

# How can the proposed bloom filter be integrated?
I see two ways to implement this feature:
## Light-weight integration at user API level. This would mean, the bloom 
filter creation, merging, and filtering are done in user code. The drawbacks 
are that managed memory is not available at this level and we will face 
serialization overhead because some operations cannot be chained. However, this 
solution should be rather easy to implement.
## New runtime operators for bloom filters. This would mean to implement three 
additional drivers to operator on bloom filters: build local bloom filter, 
merge bloom filters, and filter by bloom filter. These operations would need to 
be integrated into the optimizer as well. You can have a look at the recent 
addition of outer joins to get an idea of what this would mean. Due to some 
restrictions for chaining (only non-branching flows can be chained), the bloom 
filter building would not be chained. I think the filtering should be 
chainable. A challenge for this approach is as well that we cannot give 
guarantees about the available memory budget right now. Memory grants are just 
relative shares of the total memory. So either we have to think about 
strategies to work with less memory or add functionality to ensure we have 
enough memory.
# How should this feature be exposed to users?
If we make bloom filters part of the API, we can use them also for any kind of 
filtering. We could do something like:

{code}
DataSet<X> first = ...
DataSet<Y> second = ...
BloomFilter<Z> bf = first.buildBloomFilter("f0"); // "f0" is of type Z
DataSet<Y> filteredSecond = second.applyBloomFilter(bf, "f1"); // "f1" is of 
type Z
first.join(filteredSecond).where("f0").equalTo("f1") ...
{code}

I am not sure if this operation is common enough to be a first class member of 
the DataSet API (i.e., part of {{DataSet}}) or if it should to into the 
{{DataSetUtils}} class.





> Bloom filter for Join
> ---------------------
>
>                 Key: FLINK-2907
>                 URL: https://issues.apache.org/jira/browse/FLINK-2907
>             Project: Flink
>          Issue Type: New Feature
>          Components: Java API, Scala API
>    Affects Versions: 1.0
>            Reporter: Greg Hogan
>            Assignee: Greg Hogan
>              Labels: requires-design-doc
>
> A bloom filter can be a chainable operation for probe side Join elements. An 
> element not matched by the bloom filter will not be serialized, shipped, 
> deserialized, and processed.
> Generating the bloom filter is a chainable operation over hash side elements. 
> The bloom filter created on each TaskManager must be the same size to allow 
> combining by xor. The most efficient means to distribute the bloom filter is 
> to assign each TaskManager an equal partition that it will receive from all 
> other TaskManagers. This will be broadcast once all local elements (by 
> hashing) and remote partitions (by xor) have been processed into that part of 
> the bloom filter.
> An example with numbers: triangle listing/counting joining 2B edges on 149B 
> two-paths resulting in 21B triangles (this is using the optimal algorithm). 
> At 8 bits per element the bloom filter will have a false-positive rate of ~2% 
> and require a 2 GB bloom filter (stored once and shared per TaskManager). 
> Each TaskManager both sends and receives data equivalent to the size of the 
> bloom filter (minus the local partition, the size of which trends towards 
> zero as the number of TaskManagers increases). The number of matched elements 
> is 21B (true positive) + ~0.02*(149B-21B) = 23.5B, a reduction of 84% or 1.5 
> TB (at 12 bytes per element). With 4 TaskManagers only 12 GB of bloom filter 
> would be transmitted, a savings of 99.2%.
> Key issues are determining the size of the bloom filter (dependent on the 
> count of hash side elements, the available memory segments, and the error 
> rate) and whether this can be integrated with Join or must be a separate 
> operator. This also depends on dynamic memory allocation as spilling to disk 
> would perform the serialization, write, read, and deserialization we are 
> looking to avoid.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to