GitHub user mohamagdy opened a pull request: https://github.com/apache/flink/pull/3353
Multithreaded `DataSet#flatMap` function # Mutli-Threaded FlatMap ## Overview The DataStream#flatMap function takes a FlatMapFunction interface that has a method named flatMap that gets called by the DataStream#flatMap. The FlatMapFunction#flatMap method takes an element (DataStream record) and do some transformation on that element for example if the element is a string, the flatMap function can implement the logic for splitting the element by space or converting to upper case. The current implementation of the DataStream#flatMap uses a single thread to transform the element from one form to another. The idea of this change is to introduce a new API method the DataStream#flatMap that takes the parallelism value for transforming the input elements. ## Implementation Details The following diagram shows the multithreaded `flatMap` function. Assume in the following diagram the parallelism (maximum thread pool) is set to `3` (3 threads can run transformations on the input element) Briefly, when the `flatMap` function receives an element it pushes it to a _buffer_ then spawns a thread per element to do the transformation then write back to the _output_. The _output_ is thread-safe and only 1 thread can write at a time. It uses an `Object` as a lock state to the output. The _buffer_ is used to accumulate elements so that when the _snapshot_ job runs and the element transformation is not yet finished, the _buffer_ writes all its elements serialized into an _output stream_. When a _restore_ is called it deserialize the elements of the buffer and try run the transformation again because their output state was not taken into consideration when the snapshot job ran. ![multithreaded flatmap](https://cloud.githubusercontent.com/assets/1228432/23085859/699a2926-f56a-11e6-9146-1be213caaaa7.png) You can merge this pull request into a Git repository by running: $ git pull https://github.com/mohamagdy/flink parallel-dataset-flatmap Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3353.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3353 ---- commit bf2c710fe8af2be3475d66221f9e6b8e2090bbfc Author: Mohamed Magdy <mohamed.ma...@fyber.com> Date: 2017-02-12T21:35:06Z [FLINK-XXXX] Fix JavaDoc class name commit bbee6c580815190139f289d0309bfdc2f4ca83c6 Author: Mohamed Magdy <mohamed.ma...@fyber.com> Date: 2017-02-12T21:36:09Z [FLINK-XXXX] Override `close()` method In order to introduce threads for processing `flatMap` elements, the `close` method in `StreamFlatMap` will be overriden to shutdown the thread pool. commit e0aff5e241aff97426d8c10e84ba5a932668d815 Author: Mohamed Magdy <mohamed.ma...@fyber.com> Date: 2017-02-13T20:09:15Z [FLINK-XXXX] Organize imports in test files Ran Intellij organize imports commit 73803bc8fc89f48691b4c0270b0061377a3e7e38 Author: Mohamed Magdy <mohamed.ma...@fyber.com> Date: 2017-02-14T15:08:30Z [FLINK-XXXX] Fix typo in tests commit b9343317eb35ef41aba82c70648dc8fd8767273e Author: Mohamed Magdy <mohamed.ma...@fyber.com> Date: 2017-02-14T22:21:46Z [FLINK-XXXX] Call `close` after `processElement` in tests In order to follow the flow described in `StreamOperator` interface for the `close` method which says the following: ``` This method is called after all records have been added to the operators via the methods {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator#processElement(StreamRecord)}, or {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement1(StreamRecord)} and {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement2(StreamRecord)}. ``` commit fd0c28adceca9fbe5fc9e3ffd93679495edca02b Author: Mohamed Magdy <mohamed.ma...@fyber.com> Date: 2017-02-14T23:46:01Z [FLINK-XXXX] Add tests for multi-threaded `DataStream` `flatMap` Tweak tests to check results of `flatMap` when processing elements of `DataStream` in multiple threads. commit cbe2fa8059272d8cb49e50056466564d6d8c322d Author: Mohamed Magdy <mohamed.ma...@fyber.com> Date: 2017-02-14T23:50:45Z [FLINK-XXXX] Add multiple threads for processing `flatMap` elements Add an option to the `DataStream` `flatMap` function that sets the parallelizm of processing the `DataStream` elements. This helps in cases when processing elements of the `DataStream` blocks the main thread in favour of other elements. The main thread is blocked until all the elements are processed and all the threads finish. commit 357a72d0bc5cca497eb80483ca32a8d958dfdbb5 Author: Mohamed Magdy <mohamed.ma...@fyber.com> Date: 2017-02-15T16:44:46Z [FLINK-XXXX] Create a new `TimestampedCollector` for each thread In order to have a collector per thread instead of a single collector per thread. Though the `TimestampedCollector` uses the same `Output`. commit 43cf265582ef80e5437da1eaaa3a74113f0cbda2 Author: Mohamed Magdy <mohamed.ma...@fyber.com> Date: 2017-02-15T21:19:28Z [FLINK-XXXX] Add checkpoint and restore to `StreamFlatMap` The checkpoint and restore methods are overriden in the `StreamFlatMap`. commit 532b4a030d8ea820f7630c8b6c96e446b09f81c3 Author: Mohamed Magdy <mohamed.ma...@fyber.com> Date: 2017-02-15T23:07:42Z [FLINK-XXXX] Implement `InputTypeConfigurable` to enable checkpoints for multithreaded `flatMap` commit 2cca78cc800ccdcf5af4c01626012a1b485b03e4 Author: Mohamed Magdy <mohamed.ma...@fyber.com> Date: 2017-02-16T10:01:16Z [FLINK-XXXX] Prevent multiple threads from writing into same `output` at the same time In order to prevent race conditions writing into the same `output` of the `flatMap` at the same time, added a lock surrodinging the call to write into the `output` so that only 1 thread can write to the `output` at the same time. commit 8bd5d3487d3420fffe3f79096172611bd3d12473 Author: Mohamed Magdy <mohamed.ma...@fyber.com> Date: 2017-02-16T10:36:17Z [FLINK-XXXX] Move mutli-threaded `streamFlatMap` to new class In order to organize and separate the logic of the single and multi-threaded `StreamFlatMap` class, it is cleaner to seprate each in a different class as the logic of the multi-threaded `StreamFlatMap` started to devaite from the logic of the existing single threaded `StreamFlatMap`. commit b24046bf94f1e1301d463021a598ed2a8afb4f21 Author: Mohamed Magdy <mohamed.ma...@fyber.com> Date: 2017-02-16T13:29:09Z [FLINK-XXXX] Fix `transformation` message for multithreaded `flatMap` commit 96aaaf8b3f6a477bf0ad4d3b4d4025b0823da6f5 Author: Mohamed Magdy <mohamed.ma...@fyber.com> Date: 2017-02-16T18:49:09Z [FLINK-XXXX] Use shared lock for `flatMap` output Before this change it was a lock per object created for the `MultiThreadedTimestampedCollector` object which does not guarantee only 1 thread writes into the output at a time. commit b310aeb6fc02a49709fb789d9e73b4c09b321f3b Author: Mohamed Magdy <mohamed.ma...@fyber.com> Date: 2017-02-16T19:33:56Z [FLINK-XXXX] Fix `parallelism` check to be > 0 Instead of >= 0 case, it does not make sense for the multi-threaded `flatMap` to accept 0 parallelism value. commit e52f6496d1e2d9a2bddacfd5848a51633a4a5576 Author: Mohamed Magdy <mohamed.ma...@fyber.com> Date: 2017-02-16T23:00:17Z [FLINK-XXXX] Set `lock` object to be `transient` commit 952d901e30a4d2a43fe5031e51a4c3e58cf279b6 Author: Mohamed Magdy <mohamed.ma...@fyber.com> Date: 2017-02-17T17:43:09Z [FLINK-XXXX] Add logic for snapshot and notify of snapshot complete In order to make the multi-threaded `flatMap` fault tolerant, the snapshot and notify on snapshot completion logic is implemented. commit e30be1f41cdda5a83909cc99cdf8fbdd47ec2cbb Author: Mohamed Magdy <mohamed.ma...@fyber.com> Date: 2017-02-17T17:45:24Z [FLINK-XXXX] Remove unused variable commit fd3f08c8936071d8f1e12468995640c2f03fa972 Author: Mohamed Magdy <mohamed.ma...@fyber.com> Date: 2017-02-17T18:09:52Z [FLINK-XXXX] Replace the threads invocation to be right away instead of in `close` method Before this change, the `flatMap` elements threads used to run all at once with `invokeAll` with a list of tasks that got populated once element is received which is not applicable as the this will cause issues for snapshots because the elements will not be processed unless the `close` method is called. commit 1f8365cd520c1e7ffb9b3467d595e1dd1e85b6c2 Author: Mohamed Magdy <mohamed.ma...@fyber.com> Date: 2017-02-17T19:05:41Z [FLINK-XXXX] Use listner executor to handle buffer queue To handle the buffer queue, used a listner executore with callbacks `onSuccess` and `onFailure` to handle the buffer queue (remove records from buffer when the thread succeeds) commit c74d465e89899ab1c825c13f2940dba17b895959 Author: Mohamed Magdy <mohamed.ma...@fyber.com> Date: 2017-02-17T19:43:09Z [FLINK-XXXX] Use same collector for all threads No need to create a new collector per thread, only one collector is enough. The collector's output is synchronized and thread safe. commit f054bf2539e8f93311c5feba66731bddd06a200c Author: Mohamed Magdy <mohamed.ma...@fyber.com> Date: 2017-02-17T20:40:00Z [FLINK-XXXX] Add snapshot/recover object serialization Add the logic for snapshot capturing by serializing the elements in the buffer and deserializing it in the restore process. ---- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---