[ 
https://issues.apache.org/jira/browse/FLINK-986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14237344#comment-14237344
 ] 

ASF GitHub Bot commented on FLINK-986:
--------------------------------------

GitHub user uce opened a pull request:

    https://github.com/apache/incubator-flink/pull/254

    [FLINK-986] [FLINK-25] [Distributed runtime] Add initial support for 
intermediate results

    ## FLINK-25: Offer buffer-oriented runtime abstraction
    
    - The distributed runtime API was originally record-oriented. The output 
side has previously been refactored to a buffer-oriented API (`BufferWriter`). 
With this pull request, the input side offers a similar interface: 
`BufferReader`.
    
    - This enables us to directly work on the serialized data in the future. 
Previously, this was impossible as buffers were directly deserialized after 
being handed over to an input channel.
    
    - Currently, the buffer-oriented readers and writers are always wrapped by 
record-oriented readers and writers. The deserialization logic of the input 
channels has essentially moved to the readers.
    
    - The way of registering I/O has changed. The life-cycle of each task (see 
`AbstractInvokable`) involves two methods: `registerInputOutput()` and 
`invoke()`. The I/O setup was coupled with the creation of readers/writers in 
`registerInputOutput()` although the required information is independent of 
this Therefore, they are now directly instantiated by the runtime environment 
and can be accessed via `getEnvironment().getReader(int)` and 
`getEnvironment().getWriter(int)` respectively.
    
    ## FLINK-986: Add initial support for intermediate results
    
    - This commit introduces the abstraction of intermediate results into the 
distributed runtime. It is essentially the runtime-part of @StephanEwen's 
rework of the scheduler/execution graph 
([FLINK-1094](https://issues.apache.org/jira/browse/FLINK-1094)), which made 
the scheduler/execution graph aware of intermediate results. Job graphs used to 
essentially look like this: `Source => Task i => ... => Sink`. With Stephan's 
rework, they already look like this: `Task:Intermediate Result <= Task 
i:Intermediate Result <= ... <= Task:Intermediate Result`. Tasks produce 
intermediate results and attach to them for consumption. These changes 
logically decoupled produced results and in the job graph. At the runtime 
level, the intermediate results were still tightly coupled to the producing and 
consuming task. With this pull request, the producing task, the produced 
intermediate result, and the consuming task are decoupled as well.
    
    - Previously, the network stack was responsible to package buffers and 
immediately dispatch them to their receivers in a push-fashion. The first 
buffer for a receiver resulted in the scheduling of this receiver. With the new 
model, we gain much more flexibilty in the way we can produce and consume these 
intermediate results. The current state is feature equivalent with the current 
master, e.g. the produced intermediate result is pipelined and not persistent. 
I ran a few performance comparisons and the performance was in the same ball 
park as the current master although it was (consistently) slightly slower.
    
    ### Changes
    
    #### Buffer management/The life of a Buffer
    
    - The buffers for the network stack are allocated by each task manager at 
start up time on the heap by the `NetworkBufferPool` (previously 
`GlobalBufferPool`). These network buffers are divided uniformly at runtime 
among the tasks. The current default for this pool is set to 2048 buffers (each 
32k bytes), resulting in a total of 64 MB. Intermediate results get their 
buffers from this buffer pool. Therefore, for any in-memory persistent data set 
to make sense, we would have to give the network pool a lot more memory. I 
think that this is currently not desirable as our memory management is static, 
e.g. memory given to the network pool would be missing at operators like 
sorting or hashing. In my opinion, adaptive memory management 
([FLINK-1101](https://issues.apache.org/jira/browse/FLINK-1101)) is therefore a 
requirement before continuing with more runtime intermediate result variants 
(see below).
    
    - The buffer management for the pipelined intermediate results has 
essentially not changed. We need at least a single buffer per outgoing channel 
(or queue in the terms of the new runtime). When a task is submitted to a task 
manager, the network buffers are divided as follows: a single buffer pool **per 
produced result** (previously one for all produced results) and one pool **per 
consumed result** (unchanged).
    
    - A produced buffer used to follow the following high-level path: 
`BufferWriter => ChannelManager => NETWORK xor local InputChannel`. Now it is 
`BufferWriter => IntermediateResult`. What happens at the intermediate result 
is flexible depending on the `IntermediateResultType`. Early discussions with 
@StephanEwen suggested the following three dimensions:
    
      1. persistent/ephemeral,
      2. pipelined/blocking, and
      3. back pressure/no back pressure.
    
    The current state offers an *ephemeral-pipelined-back pressure* 
implementation, which is what we currently have. (I have removed code for a 
*persistent-blocking-no back pressure* variant for now.)
    
    #### Intermediate results, intermediate result partitions and intermediate 
result partition queues
    
    - As noted above, tasks at the job graph level are associated with 
intermediate results. When scheduling these graphs, the intermediate result is 
divided into result partitions (associated with execution vertices). Each 
parallel sub task (execution vertex) produces one partition of this result (it 
produces possibly multiple intermediate results).
    
    - At the runtime level, these partitions are further divided into queues 
depending on the degree of parallelism. For example a map-reduce with degree of 
parallelism of 2 might look like this at the map side:
    
    ```
                                                 +---------+
    +-------+               +-------------+  +=> | Queue 1 |
    | Map 1 | = produces => | Partition 1 | =|   +---------+
    +-------+               +-------------+  +=> | Queue 2 |
                                                 +---------+
    
                                                 +---------+
    +-------+               +-------------+  +=> | Queue 1 |
    | Map 2 | = produces => | Partition 2 | =|   +---------+
    +-------+               +-------------+  +=> | Queue 2 |
                                                 +---------+
    ```
    
    #### Scheduling of consumers and receiver-initiated sending
    
    - Depending on the intermediate result type, it is necessary to deploy the 
consumers either when the first buffer is produced or after all buffers have 
been produced (depending on the pipelined vs. blocking type). The receivers 
then request the respective intermediate result partition queue from the task 
managers where the partitions were produced. For a data repartitioning as in a 
map-reduce, each sub task would request the queue matching its subtask number, 
e.g.:
    
    ```
                                                 +---------+                    
     +----------+
    +-------+               +-------------+  +=> | Queue 1 | <=======+=== 
requests = | Reduce 1 |
    | Map 1 | = produces => | Partition 1 | =|   +---------+         |          
     +----------+
    +-------+               +-------------+  +=> | Queue 2 | <==+    |
                                                 +---------+    |    |
                                                                |    |
                                                 +---------+    |    |
    +-------+               +-------------+  +=> | Queue 1 | <==+====+
    | Map 2 | = produces => | Partition 2 | =|   +---------+    |               
     +----------+
    +-------+               +-------------+  +=> | Queue 2 | <==+======== 
requests = | Reduce 2 |
                                                 +---------+                    
     +----------+
    ```
    
    If the partitioning is not known or a task has no known consumers, the 
partition will consist of a single queue (or the default degree of 
parallelism). When it is consumed, a re-partitioning task needs to be inserted.
    
    - The scheduling of consumers requires notifications to the central job 
manager, which can then deploy the consumers. For a single produced *pipelined* 
intermediate result, this results in `number of produced partitions` RPC calls 
to the job manager (see `ConsumerNotificationProtocol`). The consumer tasks can 
then be deployed with the necessary information to request the respective 
queues. This results in `number of consumer sub tasks` task deployment RPC 
calls. The first notification from a task manager results in the deployment of 
the receiver. It is possible that not all producer task locations are known at 
deployment time of a consumer task. At the moment, we follow the naive approach 
of sending task update RPC calls with information about the location of the 
producer for every partition. This currently results in a total of `N * M` RPC 
calls per produced intermediate result (where `N` is the number of produced 
partitions and `M` the number of consumer tasks).
    
    - For blocking results, this can be reduced to `2*N` as we are guaranteed 
to know all producer locations when all partitions are finished. It should also 
be possible to further reduce this number to the number of task managers by 
collecting the notifications per task manager and then sending them to the job 
manager at once.
    
    #### More robust shuffle
    
    - Shuffles were previously managed by a large per task manager component 
called the `ChannelManager`, which kept track of all channels. This component 
was huge and responsible for many different things (network buffers, channel 
lookup, buffer routing, TCP connections). This has been re-factored to more 
localized components, where resource acquisition and release can be handled in 
the same local component, which is in my opinion much easier to follow and 
understand.
    
    - In particular, we had issues with the release of resources like channel 
lookup tables or TCP connections, which have been addressed with this pull 
request as well. Especially the Netty-based network code was not reliable in 
the way it propagated errors to the tasks and released all resources. 
Addressing this was a requirement for adding reliable fault-tolerance 
mechanisms in the future.
    
    ### Next steps
    
    The goal is to merge the current state as soon as possible and then start 
to extend (I've removed some code, which was already in place). I could have 
done this way earlier and intend to **not** make that mistake again.
    
    1. I've only tested on a small custer (4 nodes) with a low degree of 
parallelism (32). I have to test further on a larger cluster with a higher 
degree of parallelism. I also didn't test iterative programs yet. After doing 
this, I will also post the numbers.
    2. I've already discovered a few problems in corner cases, which I need to 
fix before we review this.
    3. Some tests are failing and I've disabled a few others, which I will 
adjust and enable again.
    
    I intend to address these points over the course of this week and work on 
the branch of this pull request.
    
    After I've addressed the above points, I find it reasonable to work on 
reducing the number of redundant RPC calls and introducing the 
*persistent-blocking-no back pressure* intermediate result partition variant.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/uce/incubator-flink 
flink-986-runtime_intermediate_results

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-flink/pull/254.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #254
    
----
commit 5e879773ed214880b0108238765761cc4391281f
Author: Ufuk Celebi <u...@apache.org>
Date:   2014-10-08T09:40:31Z

    [FLINK-986] [FLINK-25] [Distributed runtime] Add initial support for 
intermediate results

commit 2a57f078ee6a16e55be44f5c6ee6c3eb66abf5a3
Author: Ufuk Celebi <u...@apache.org>
Date:   2014-12-05T15:27:45Z

    [Distributed runtime] Move BroadcastVariableManager to NetworkEnvironment

commit 347eaf9c0000ff59a1bf92551fe53cc9a2d481d3
Author: Ufuk Celebi <u...@apache.org>
Date:   2014-12-05T15:28:54Z

    [Distributed runtime] Add TODO for intermediate result consumer 
notification RPC calls

commit 00bad69044b801a280fe3dde18671f4ddf184d21
Author: Ufuk Celebi <u...@apache.org>
Date:   2014-12-05T15:43:05Z

    [Distributed runtime] Log number of update task RPC calls

commit cabd126bc726443770089628036dacae8d06f776
Author: Ufuk Celebi <u...@apache.org>
Date:   2014-12-05T16:29:58Z

    [Distributed runtime] Fix possible error during release of resources in 
RuntimeEnvironment

commit d46d2961bba82c53ad1c24419a34584c78e80be3
Author: Ufuk Celebi <u...@apache.org>
Date:   2014-12-05T20:37:15Z

    [Distributed runtime] Minor code cleanup

commit a77a713fa898b4eeaacc19ef4b0758d89e3dbbf9
Author: Ufuk Celebi <u...@apache.org>
Date:   2014-12-06T12:50:48Z

    [Distributed runtime] Fix checkstyle and visibility errors

commit 491f2f4cb395074669116ed6fd4b8cab810a53fc
Author: Ufuk Celebi <u...@apache.org>
Date:   2014-12-06T13:05:37Z

    [Distributed runtime] Use ConcurrentLinkedQueue instead of Deque, which is 
only available since Java 1.7

commit 27ed29baf79d655f5e72321f2446428a6e92ab59
Author: Ufuk Celebi <u...@apache.org>
Date:   2014-12-06T13:21:39Z

    [Distributed runtime] Add lock to iterator subscription to prevent possible 
deadlock

commit 0feca10fb19d8de18eaca35a64c4a08ab30ed1c4
Author: Ufuk Celebi <u...@apache.org>
Date:   2014-12-07T13:17:09Z

    [Distributed runtime] Fix lifecycle of intermediate result partitions

commit 3d6a26a72fc76e21f936390f3f07ddad8f661069
Author: Ufuk Celebi <u...@apache.org>
Date:   2014-12-07T13:42:34Z

    [Distributed runtime] Throw proper exception when illegal queue request

commit 200626b6528bf3dc55b29955adc437e18ce64561
Author: Ufuk Celebi <u...@apache.org>
Date:   2014-12-07T15:24:46Z

    [Distributed runtime] [Tests] Verify buffer reycled when Buffer decoded

commit b049b3190887d6a5e989f51afa89b8e20ebe2614
Author: Ufuk Celebi <u...@apache.org>
Date:   2014-12-07T15:59:06Z

    [Distributed runtime] Log debug output of partition queue handler

commit 2f9638ba7d84796673913aab847538df96fabc08
Author: Ufuk Celebi <u...@apache.org>
Date:   2014-12-07T17:01:26Z

    [Distributed runtime] Make BufferReader and BufferWriter final

commit 0a42dfac5517b11f378deb31c78a0111c2e8faaa
Author: Ufuk Celebi <u...@apache.org>
Date:   2014-12-07T20:01:11Z

    [Distributed runtime] Log debug output of client request handler

----


> Add intermediate results to distributed runtime
> -----------------------------------------------
>
>                 Key: FLINK-986
>                 URL: https://issues.apache.org/jira/browse/FLINK-986
>             Project: Flink
>          Issue Type: New Feature
>          Components: Distributed Runtime
>            Reporter: Ufuk Celebi
>            Assignee: Ufuk Celebi
>            Priority: Blocker
>
> Support for intermediate results in the runtime is currently blocking 
> different efforts like fault tolerance or result collection at the client.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to