Greg Hogan resolved FLINK-2907.
    Resolution: Later

Closing this issue with current thoughts:
- this would only be applicable for batch since the input must be fully 
materialized in order to generate and distribute the bloom filter
- without slot affinity the bloom filters may not be readily shareable on a 
- even though the bloom filter is a compressed representation of the DataSet, 
for a large quantity of data the partitioned DataSet may fit in memory whereas 
the bloom filter may not
- without a priori knowledge of the size of the DataSet either a large number 
of memory segments must be reserved or multiple bloom filters created with 
increasing size
- the tradeoff with a bloom filter is additional lookups in memory but less 
shuffling of data; it's not clear that these lookups (which for even a most 
data set will not be in CPU cache) are more efficient than (de)serializing 
elements to/from network buffers (multiple bloom filters exacerbates this 

I still think this would be very interesting to research but there are many 
other performance improvements I would like to prioritize.

> Bloom filter for Join
> ---------------------
>                 Key: FLINK-2907
>                 URL: https://issues.apache.org/jira/browse/FLINK-2907
>             Project: Flink
>          Issue Type: New Feature
>          Components: DataSet API
>    Affects Versions: 1.0.0
>            Reporter: Greg Hogan
>            Assignee: Greg Hogan
>              Labels: requires-design-doc
> A bloom filter can be a chainable operation for probe side Join elements. An 
> element not matched by the bloom filter will not be serialized, shipped, 
> deserialized, and processed.
> Generating the bloom filter is a chainable operation over hash side elements. 
> The bloom filter created on each TaskManager must be the same size to allow 
> combining by xor. The most efficient means to distribute the bloom filter is 
> to assign each TaskManager an equal partition that it will receive from all 
> other TaskManagers. This will be broadcast once all local elements (by 
> hashing) and remote partitions (by xor) have been processed into that part of 
> the bloom filter.
> An example with numbers: triangle listing/counting joining 2B edges on 149B 
> two-paths resulting in 21B triangles (this is using the optimal algorithm). 
> At 8 bits per element the bloom filter will have a false-positive rate of ~2% 
> and require a 2 GB bloom filter (stored once and shared per TaskManager). 
> Each TaskManager both sends and receives data equivalent to the size of the 
> bloom filter (minus the local partition, the size of which trends towards 
> zero as the number of TaskManagers increases). The number of matched elements 
> is 21B (true positive) + ~0.02*(149B-21B) = 23.5B, a reduction of 84% or 1.5 
> TB (at 12 bytes per element). With 4 TaskManagers only 12 GB of bloom filter 
> would be transmitted, a savings of 99.2%.
> Key issues are determining the size of the bloom filter (dependent on the 
> count of hash side elements, the available memory segments, and the error 
> rate) and whether this can be integrated with Join or must be a separate 
> operator. This also depends on dynamic memory allocation as spilling to disk 
> would perform the serialization, write, read, and deserialization we are 
> looking to avoid.

This message was sent by Atlassian JIRA

Reply via email to