GitHub user uce opened a pull request:
https://github.com/apache/incubator-flink/pull/254
[FLINK-986] [FLINK-25] [Distributed runtime] Add initial support for
intermediate results
## FLINK-25: Offer buffer-oriented runtime abstraction
- The distributed runtime API was originally record-oriented. The output
side has previously been refactored to a buffer-oriented API (`BufferWriter`).
With this pull request, the input side offers a similar interface:
`BufferReader`.
- This enables us to directly work on the serialized data in the future.
Previously, this was impossible as buffers were directly deserialized after
being handed over to an input channel.
- Currently, the buffer-oriented readers and writers are always wrapped by
record-oriented readers and writers. The deserialization logic of the input
channels has essentially moved to the readers.
- The way of registering I/O has changed. The life-cycle of each task (see
`AbstractInvokable`) involves two methods: `registerInputOutput()` and
`invoke()`. The I/O setup was coupled with the creation of readers/writers in
`registerInputOutput()` although the required information is independent of
this Therefore, they are now directly instantiated by the runtime environment
and can be accessed via `getEnvironment().getReader(int)` and
`getEnvironment().getWriter(int)` respectively.
## FLINK-986: Add initial support for intermediate results
- This commit introduces the abstraction of intermediate results into the
distributed runtime. It is essentially the runtime-part of @StephanEwen's
rework of the scheduler/execution graph
([FLINK-1094](https://issues.apache.org/jira/browse/FLINK-1094)), which made
the scheduler/execution graph aware of intermediate results. Job graphs used to
essentially look like this: `Source => Task i => ... => Sink`. With Stephan's
rework, they already look like this: `Task:Intermediate Result <= Task
i:Intermediate Result <= ... <= Task:Intermediate Result`. Tasks produce
intermediate results and attach to them for consumption. These changes
logically decoupled produced results and in the job graph. At the runtime
level, the intermediate results were still tightly coupled to the producing and
consuming task. With this pull request, the producing task, the produced
intermediate result, and the consuming task are decoupled as well.
- Previously, the network stack was responsible to package buffers and
immediately dispatch them to their receivers in a push-fashion. The first
buffer for a receiver resulted in the scheduling of this receiver. With the new
model, we gain much more flexibilty in the way we can produce and consume these
intermediate results. The current state is feature equivalent with the current
master, e.g. the produced intermediate result is pipelined and not persistent.
I ran a few performance comparisons and the performance was in the same ball
park as the current master although it was (consistently) slightly slower.
### Changes
#### Buffer management/The life of a Buffer
- The buffers for the network stack are allocated by each task manager at
start up time on the heap by the `NetworkBufferPool` (previously
`GlobalBufferPool`). These network buffers are divided uniformly at runtime
among the tasks. The current default for this pool is set to 2048 buffers (each
32k bytes), resulting in a total of 64 MB. Intermediate results get their
buffers from this buffer pool. Therefore, for any in-memory persistent data set
to make sense, we would have to give the network pool a lot more memory. I
think that this is currently not desirable as our memory management is static,
e.g. memory given to the network pool would be missing at operators like
sorting or hashing. In my opinion, adaptive memory management
([FLINK-1101](https://issues.apache.org/jira/browse/FLINK-1101)) is therefore a
requirement before continuing with more runtime intermediate result variants
(see below).
- The buffer management for the pipelined intermediate results has
essentially not changed. We need at least a single buffer per outgoing channel
(or queue in the terms of the new runtime). When a task is submitted to a task
manager, the network buffers are divided as follows: a single buffer pool **per
produced result** (previously one for all produced results) and one pool **per
consumed result** (unchanged).
- A produced buffer used to follow the following high-level path:
`BufferWriter => ChannelManager => NETWORK xor local InputChannel`. Now it is
`BufferWriter => IntermediateResult`. What happens at the intermediate result
is flexible depending on the `IntermediateResultType`. Early discussions with
@StephanEwen suggested the following three dimensions:
1. persistent/ephemeral,
2. pipelined/blocking, and
3. back pressure/no back pressure.
The current state offers an *ephemeral-pipelined-back pressure*
implementation, which is what we currently have. (I have removed code for a
*persistent-blocking-no back pressure* variant for now.)
#### Intermediate results, intermediate result partitions and intermediate
result partition queues
- As noted above, tasks at the job graph level are associated with
intermediate results. When scheduling these graphs, the intermediate result is
divided into result partitions (associated with execution vertices). Each
parallel sub task (execution vertex) produces one partition of this result (it
produces possibly multiple intermediate results).
- At the runtime level, these partitions are further divided into queues
depending on the degree of parallelism. For example a map-reduce with degree of
parallelism of 2 might look like this at the map side:
```
+---------+
+-------+ +-------------+ +=> | Queue 1 |
| Map 1 | = produces => | Partition 1 | =| +---------+
+-------+ +-------------+ +=> | Queue 2 |
+---------+
+---------+
+-------+ +-------------+ +=> | Queue 1 |
| Map 2 | = produces => | Partition 2 | =| +---------+
+-------+ +-------------+ +=> | Queue 2 |
+---------+
```
#### Scheduling of consumers and receiver-initiated sending
- Depending on the intermediate result type, it is necessary to deploy the
consumers either when the first buffer is produced or after all buffers have
been produced (depending on the pipelined vs. blocking type). The receivers
then request the respective intermediate result partition queue from the task
managers where the partitions were produced. For a data repartitioning as in a
map-reduce, each sub task would request the queue matching its subtask number,
e.g.:
```
+---------+
+----------+
+-------+ +-------------+ +=> | Queue 1 | <=======+===
requests = | Reduce 1 |
| Map 1 | = produces => | Partition 1 | =| +---------+ |
+----------+
+-------+ +-------------+ +=> | Queue 2 | <==+ |
+---------+ | |
| |
+---------+ | |
+-------+ +-------------+ +=> | Queue 1 | <==+====+
| Map 2 | = produces => | Partition 2 | =| +---------+ |
+----------+
+-------+ +-------------+ +=> | Queue 2 | <==+========
requests = | Reduce 2 |
+---------+
+----------+
```
If the partitioning is not known or a task has no known consumers, the
partition will consist of a single queue (or the default degree of
parallelism). When it is consumed, a re-partitioning task needs to be inserted.
- The scheduling of consumers requires notifications to the central job
manager, which can then deploy the consumers. For a single produced *pipelined*
intermediate result, this results in `number of produced partitions` RPC calls
to the job manager (see `ConsumerNotificationProtocol`). The consumer tasks can
then be deployed with the necessary information to request the respective
queues. This results in `number of consumer sub tasks` task deployment RPC
calls. The first notification from a task manager results in the deployment of
the receiver. It is possible that not all producer task locations are known at
deployment time of a consumer task. At the moment, we follow the naive approach
of sending task update RPC calls with information about the location of the
producer for every partition. This currently results in a total of `N * M` RPC
calls per produced intermediate result (where `N` is the number of produced
partitions and `M` the number of consumer tasks).
- For blocking results, this can be reduced to `2*N` as we are guaranteed
to know all producer locations when all partitions are finished. It should also
be possible to further reduce this number to the number of task managers by
collecting the notifications per task manager and then sending them to the job
manager at once.
#### More robust shuffle
- Shuffles were previously managed by a large per task manager component
called the `ChannelManager`, which kept track of all channels. This component
was huge and responsible for many different things (network buffers, channel
lookup, buffer routing, TCP connections). This has been re-factored to more
localized components, where resource acquisition and release can be handled in
the same local component, which is in my opinion much easier to follow and
understand.
- In particular, we had issues with the release of resources like channel
lookup tables or TCP connections, which have been addressed with this pull
request as well. Especially the Netty-based network code was not reliable in
the way it propagated errors to the tasks and released all resources.
Addressing this was a requirement for adding reliable fault-tolerance
mechanisms in the future.
### Next steps
The goal is to merge the current state as soon as possible and then start
to extend (I've removed some code, which was already in place). I could have
done this way earlier and intend to **not** make that mistake again.
1. I've only tested on a small custer (4 nodes) with a low degree of
parallelism (32). I have to test further on a larger cluster with a higher
degree of parallelism. I also didn't test iterative programs yet. After doing
this, I will also post the numbers.
2. I've already discovered a few problems in corner cases, which I need to
fix before we review this.
3. Some tests are failing and I've disabled a few others, which I will
adjust and enable again.
I intend to address these points over the course of this week and work on
the branch of this pull request.
After I've addressed the above points, I find it reasonable to work on
reducing the number of redundant RPC calls and introducing the
*persistent-blocking-no back pressure* intermediate result partition variant.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/uce/incubator-flink
flink-986-runtime_intermediate_results
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/incubator-flink/pull/254.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #254
----
commit 5e879773ed214880b0108238765761cc4391281f
Author: Ufuk Celebi <[email protected]>
Date: 2014-10-08T09:40:31Z
[FLINK-986] [FLINK-25] [Distributed runtime] Add initial support for
intermediate results
commit 2a57f078ee6a16e55be44f5c6ee6c3eb66abf5a3
Author: Ufuk Celebi <[email protected]>
Date: 2014-12-05T15:27:45Z
[Distributed runtime] Move BroadcastVariableManager to NetworkEnvironment
commit 347eaf9c0000ff59a1bf92551fe53cc9a2d481d3
Author: Ufuk Celebi <[email protected]>
Date: 2014-12-05T15:28:54Z
[Distributed runtime] Add TODO for intermediate result consumer
notification RPC calls
commit 00bad69044b801a280fe3dde18671f4ddf184d21
Author: Ufuk Celebi <[email protected]>
Date: 2014-12-05T15:43:05Z
[Distributed runtime] Log number of update task RPC calls
commit cabd126bc726443770089628036dacae8d06f776
Author: Ufuk Celebi <[email protected]>
Date: 2014-12-05T16:29:58Z
[Distributed runtime] Fix possible error during release of resources in
RuntimeEnvironment
commit d46d2961bba82c53ad1c24419a34584c78e80be3
Author: Ufuk Celebi <[email protected]>
Date: 2014-12-05T20:37:15Z
[Distributed runtime] Minor code cleanup
commit a77a713fa898b4eeaacc19ef4b0758d89e3dbbf9
Author: Ufuk Celebi <[email protected]>
Date: 2014-12-06T12:50:48Z
[Distributed runtime] Fix checkstyle and visibility errors
commit 491f2f4cb395074669116ed6fd4b8cab810a53fc
Author: Ufuk Celebi <[email protected]>
Date: 2014-12-06T13:05:37Z
[Distributed runtime] Use ConcurrentLinkedQueue instead of Deque, which is
only available since Java 1.7
commit 27ed29baf79d655f5e72321f2446428a6e92ab59
Author: Ufuk Celebi <[email protected]>
Date: 2014-12-06T13:21:39Z
[Distributed runtime] Add lock to iterator subscription to prevent possible
deadlock
commit 0feca10fb19d8de18eaca35a64c4a08ab30ed1c4
Author: Ufuk Celebi <[email protected]>
Date: 2014-12-07T13:17:09Z
[Distributed runtime] Fix lifecycle of intermediate result partitions
commit 3d6a26a72fc76e21f936390f3f07ddad8f661069
Author: Ufuk Celebi <[email protected]>
Date: 2014-12-07T13:42:34Z
[Distributed runtime] Throw proper exception when illegal queue request
commit 200626b6528bf3dc55b29955adc437e18ce64561
Author: Ufuk Celebi <[email protected]>
Date: 2014-12-07T15:24:46Z
[Distributed runtime] [Tests] Verify buffer reycled when Buffer decoded
commit b049b3190887d6a5e989f51afa89b8e20ebe2614
Author: Ufuk Celebi <[email protected]>
Date: 2014-12-07T15:59:06Z
[Distributed runtime] Log debug output of partition queue handler
commit 2f9638ba7d84796673913aab847538df96fabc08
Author: Ufuk Celebi <[email protected]>
Date: 2014-12-07T17:01:26Z
[Distributed runtime] Make BufferReader and BufferWriter final
commit 0a42dfac5517b11f378deb31c78a0111c2e8faaa
Author: Ufuk Celebi <[email protected]>
Date: 2014-12-07T20:01:11Z
[Distributed runtime] Log debug output of client request handler
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---