Greg Hogan resolved FLINK-2907.
Closing this issue with current thoughts:
- this would only be applicable for batch since the input must be fully
materialized in order to generate and distribute the bloom filter
- without slot affinity the bloom filters may not be readily shareable on a
- even though the bloom filter is a compressed representation of the DataSet,
for a large quantity of data the partitioned DataSet may fit in memory whereas
the bloom filter may not
- without a priori knowledge of the size of the DataSet either a large number
of memory segments must be reserved or multiple bloom filters created with
- the tradeoff with a bloom filter is additional lookups in memory but less
shuffling of data; it's not clear that these lookups (which for even a most
data set will not be in CPU cache) are more efficient than (de)serializing
elements to/from network buffers (multiple bloom filters exacerbates this
I still think this would be very interesting to research but there are many
other performance improvements I would like to prioritize.
> Bloom filter for Join
> Key: FLINK-2907
> URL: https://issues.apache.org/jira/browse/FLINK-2907
> Project: Flink
> Issue Type: New Feature
> Components: DataSet API
> Affects Versions: 1.0.0
> Reporter: Greg Hogan
> Assignee: Greg Hogan
> Labels: requires-design-doc
> A bloom filter can be a chainable operation for probe side Join elements. An
> element not matched by the bloom filter will not be serialized, shipped,
> deserialized, and processed.
> Generating the bloom filter is a chainable operation over hash side elements.
> The bloom filter created on each TaskManager must be the same size to allow
> combining by xor. The most efficient means to distribute the bloom filter is
> to assign each TaskManager an equal partition that it will receive from all
> other TaskManagers. This will be broadcast once all local elements (by
> hashing) and remote partitions (by xor) have been processed into that part of
> the bloom filter.
> An example with numbers: triangle listing/counting joining 2B edges on 149B
> two-paths resulting in 21B triangles (this is using the optimal algorithm).
> At 8 bits per element the bloom filter will have a false-positive rate of ~2%
> and require a 2 GB bloom filter (stored once and shared per TaskManager).
> Each TaskManager both sends and receives data equivalent to the size of the
> bloom filter (minus the local partition, the size of which trends towards
> zero as the number of TaskManagers increases). The number of matched elements
> is 21B (true positive) + ~0.02*(149B-21B) = 23.5B, a reduction of 84% or 1.5
> TB (at 12 bytes per element). With 4 TaskManagers only 12 GB of bloom filter
> would be transmitted, a savings of 99.2%.
> Key issues are determining the size of the bloom filter (dependent on the
> count of hash side elements, the available memory segments, and the error
> rate) and whether this can be integrated with Join or must be a separate
> operator. This also depends on dynamic memory allocation as spilling to disk
> would perform the serialization, write, read, and deserialization we are
> looking to avoid.
This message was sent by Atlassian JIRA