[ https://issues.apache.org/jira/browse/FLINK-8601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16386361#comment-16386361 ]
ASF GitHub Bot commented on FLINK-8601: --------------------------------------- GitHub user sihuazhou opened a pull request: https://github.com/apache/flink/pull/5641 [FLINK-8601][WIP] Introduce PartitionedBloomFilter for Approximate calculation and other situations of performance optimization This PR introduce PartitionedBloomFilter which support rescaling and can deal with data skew problem properly. ## Brief change log - introduce PartitionedBloomFilter for Approximate calculation and other situations of performance optimization. ## Verifying this change This change can be verified by the unit tests in below files: - PartitionedBloomFilterTest.java - LinkedBloomFilterTest.java - LinkedBloomFilterNodeTest.java - PartitionedBloomFilterManagerTest.java ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) doc: [google doc](https://docs.google.com/document/d/1s8w2dkNFDM9Fb2zoHwHY0hJRrqatAFta42T97nDXmqc/edit?usp=sharing) You can merge this pull request into a Git repository by running: $ git pull https://github.com/sihuazhou/flink bloomfilter_state Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5641.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5641 ---- commit 5429abe0031a93596b12dada6e9696f3179eb4e8 Author: summerleafs <summerleafs@...> Date: 2018-02-06T16:47:25Z introduce bloom filter state. commit 2d1f66c10fbf74272be76283b909b290ae55d4fd Author: summerleafs <summerleafs@...> Date: 2018-02-07T14:52:22Z add unit tests for bloom filter state. commit 433370a12814f7bd80127d4508e1dd0812a9d3fe Author: summerleafs <summerleafs@...> Date: 2018-02-07T18:12:13Z add general type support. commit 5e05ee84353516fe7ff6eb7dd3a01dfdb3337bc5 Author: summerleafs <summerleafs@...> Date: 2018-02-09T15:10:11Z this is a tmp commit. commit 6e4ff0cebed853c598e0647e9f8aa56b5b59d0cc Author: summerleafs <summerleafs@...> Date: 2018-02-10T14:30:13Z this is a tmp commit. commit aa672e6e1e89b185722fde44a9b4044b87010c99 Author: summerleafs <summerleafs@...> Date: 2018-02-10T15:32:01Z this is a tmp commit. commit 3b04502ba277cad2a7b0bc381fb192d18b56f17d Author: summerleafs <summerleafs@...> Date: 2018-02-11T11:34:54Z fix build. commit 775d6aaf354de35c7ddff242f8e006e13e9a0e76 Author: summerleafs <summerleafs@...> Date: 2018-02-12T03:52:43Z add annotation for classes. commit b7f04303aa1ec1fbe9696bb58b13838b6a74a7ae Author: summerleafs <summerleafs@...> Date: 2018-02-12T03:53:19Z a temp commit. commit 28222bf5fc352a26082f2aee19be70ca5f9aa9d9 Author: sihuazhou <summerleafs@...> Date: 2018-03-05T16:48:15Z fix build. ---- > Introduce PartitionedBloomFilter for Approximate calculation and other > situations of performance optimization > ------------------------------------------------------------------------------------------------------------- > > Key: FLINK-8601 > URL: https://issues.apache.org/jira/browse/FLINK-8601 > Project: Flink > Issue Type: New Feature > Components: DataStream API, State Backends, Checkpointing > Affects Versions: 1.5.0 > Reporter: Sihua Zhou > Assignee: Sihua Zhou > Priority: Major > > h3. Backgroud > Bloom filter is useful in many situation, for example: > * 1. Approximate calculation: deduplication (eg: UV calculation) > * 2. Performance optimization: eg, [runtime filter > join|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html] > By using BF, we can greatly reduce the number of queries for state > data in a stream join, and these filtered queries will eventually fail to > find any results, which is a poor performance for rocksdb-based state due to > traversing ```sst``` on the disk. > However, based on the current status provided by flink, it is hard to use the > bloom filter for the following reasons: > * 1. Serialization problem: Bloom filter status can be large (for example: > 100M), if implement it based on the RocksDB state, the state data will need > to be serialized each time it is queried and updated, and the performance > will be very poor. > * 2. Data skewed: Data in different key group can be skewed, and the > information of data skewed can not be accurately predicted before the program > is running. Therefore, it is impossible to determine how much resources bloom > filter should allocate. One way to do this is to allocate space needed for > the most skewed case, but this can lead to very serious waste of resources. > h3. Requirement > Therefore, I introduce the PartitionedBloomFilter for flink, which at least > need to meet the following features: > * 1. Support for changing Parallelism > * 2. Only serialize when necessary: when performing checkpoint > * 3. Can deal with data skew problem: users only need to specify a > PartitionedBloomFilter with the desired input, fpp, system will allocate > resource dynamic. > * 4. Do not conflict with other state: user can use KeyedState and > OperateState when using this bloom filter. > * 5. Support relax ttl (ie: the data survival time at least greater than the > specified time) > Design doc: [design > doc|https://docs.google.com/document/d/1s8w2dkNFDM9Fb2zoHwHY0hJRrqatAFta42T97nDXmqc/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)