Yingda Chen created TEZ-3997:
--------------------------------
Summary: Enable CONCURRENT edge
Key: TEZ-3997
URL: https://issues.apache.org/jira/browse/TEZ-3997
Project: Apache Tez
Issue Type: New Feature
Reporter: Yingda Chen
A better formatted (and commentable) google doc with figures can be found at
[https://docs.google.com/document/d/17WRi2hwQGb-ms0-yHMp4OJf6yjmbh5aBy98N6z-tUgY/edit?usp=sharing]
h1. *Enable CONCURRENT edge in Tez*
h2. *Motivation*
Tez was designed, from day one, to be a unifying framework for building data
processing application[1], with the promise to support different workloads. Yet
the focus on Tez has largely placed on supporting batch data processing, such
as Hive/Pig. For those applications, edge SchedulingType is usually modeled as
SEQUENTIAL, with the between-vertices shuffle implemented on top of Tez APIs.
We believe that there are legitimate needs to fully enable the CONCURRENT
SchedulingType, which break away from the assumption that destination vertex
can only be scheduled after (part of) the the tasks in source vertex have been
completed.
There are various scenarios where CONCURRENT scheduling type can be helpful,
such as the gang scheduling of the whole DAG, or a refined version that
gang-scheduled a sub-graph of a DAG, known as “bubble scheduling”[2]. In
addition, we have found that for Tez to truly unify workloads other than
conventional MR or SQL-like applications, the need of CONCURRENT scheduling
become more pressing. For example, a parameter-server application[3] can be
modeled as a DAG below, where *PS* denotes the vertex that hosts
parameter-servers, and *W* denotes the vertex that hosts workers responsible
for heavy-lifting data-processing. There are two fundamental assumptions that
must be satisfied for parameter-server to work:
see [google
doc|https://docs.google.com/document/d/17WRi2hwQGb-ms0-yHMp4OJf6yjmbh5aBy98N6z-tUgY/edit?usp=sharing]
for figure
Fig. 1, Parameter-server modeled as a DAG with concurrent edge
# All servers (i.e., all tasks in PS vertex) must be up and running before any
worker (task in W vertex) can make meaningful progress
# All servers must run concurrently with the workers through the lifetime of
the job
Note that one salient common trait shared by the above example is that the
EPHEMERAL data source type comes hand-in-hand with the CONCURRENT scheduling
type. While this is what we have found to be true in many practical scenarios,
the original design in Tez that provides orthogonal DataSourceType and
Scheduling remains more descriptive, and the proposed changes here would keep
that intact.
Overall, we believe that the fundamental design of Tez framework, such as the
pluggale Edge/Vertex managers and versatile edge types, provides the
customizability needed to enable the various scenarios described above, and we
propose to make the following changes.
h2. *Proposed Changes*
To address the above issues, we propose the following changes:
# (No API change) Allow CONCURRENT edge property in DAG construction and
introduce ConcurrentSchedulingType.
# (Some API change/addition) Extend VertexManagerPlugin interface to allow for
relevant events notification.
# Enable downstream vertex connecting to an EPHEMERAL data source, to reason
about network connections of upstream tasks.
# Allow mixture of CONCURRENT/SEQUENTIAL incoming edges on the same vertex
The details for the proposed changes are provided in the following sections.
* *Allow CONCURRENT edge property in DAG construction and introduce
ConcurrentSchedulingType*
|Note:There is no API change in this proposed change. The majority of this
change will be lifting some existing constraints against CONCURRENT edge type,
and addition of a VertexMangerPlugin implementation.|
This includes enabling the CONCURRENT SchedulingType as a valid edge property,
by removing all the sanity check against CONCURRENT during DAG
construction/execution. A new VertexManagerPlugin (namely
VertexManagerWithConcurrentInput) will be implemented for vertex with incoming
concurrent edge(s).
In addition, we will assume in this change that
* A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges
* No shuffle or data movement is handled by Tez framework when two vertices
are connected through a CONCURRENT edge. Instead, runtime should be responsible
for handling all the data-plane communications (as proposed in [1]).
Note that the above assumptions are common for scenarios such as whole-DAG or
sub-graph gang scheduling, but they may be relaxed in later implementation,
which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex.
Most of the (meaningful) scheduling decisions today in Tez are made based on
the notion of (or an extended version of) source task completion. This will no
longer be true in presence of CONCURRENT edge. Instead, events such as source
vertex configured, or source task running will become more relevant when making
scheduling decision for two vertices connected via a CONCURRENT edge. We
therefore introduce a new enum *ConcurrentSchedulingType* to describe the
“scheduling timing” for the downstream vertex in such scenarios.
|public enum ConcurrentSchedulingType{
/** * trigger downstream vertex tasks scheduling by "configured" event of
upstream vertices */
SOURCE_VERTEX_CONFIGURED,
/** * trigger downstream vertex tasks scheduling by "running" event of
upstream tasks */
SOURCE_TASK_STARTED
}|
Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the
scheduling type, which suffice for scenarios of whole-DAG or sub-graph
gang-scheduling, where we want (all the tasks in) the downstream vertex to be
scheduled together with (all the tasks) in the upstream vertex. In this case,
we can leverage the existing onVertexStateUpdated() interface of
VextexMangerPlugin to collect relevant information to assist the scheduling
decision, and *there is no additional API change necessary*. However, in more
subtle case such as the parameter-server example described in Fig. 1, other
scheduling type would be more relevant, therefore the placeholder for
*ConcurrentSchedulingType* will be introduced in this change as part of the
infrastructure work.
Finally, since we assume that all communications between two vertices connected
via CONCURRENT edge are handled by application runtime, a CONCURRENT edge will
be assigned a DummyEdgeManager that basically mute all DME/VME handling.
# *Extend VertexManagerPlugin interface to allow for relevant events
notification*
For concurrent connection, the downstream and upstream vertices would be
running concurrently, and in some cases, they would be scheduled at the same
time as well, such as (sub-graph) gang scheduling. However, *this is not always
true*. In the example in Fig. 1, tasks in PS vertex should be running before
tasks in W vertex should be scheduled. Since otherwise if the resource requests
for PS cannot be fulfilled first, W will be spinning in vain. In other
examples, as long as part of tasks in upstream vertex are running, we can start
scheduling downstream tasks.
In other words, if we put this into the context of existing
interface/implementation of VertexMangerPlugin, we can see strong duality of
“OnSourceTaskRunning” for concurrent connection vs the “OnSourceTaskCompleted”
for (existing) sequential connection. Therefore, we propose an addition of
“_onConcurrentSourceTaskRunning(TaskAttemptIdentifer attempt)_” interface to
the VertexManager Plugin, with default implementation being not supported.
This change will also include the logic to add source task running event and to
send such events to downstream vertices. To reduce unnecessary event traffic,
we will limit the sending of such events to CONCURRENT edge, and when the
ConcurrentSchedulingType is specified to be SOURCE_TASK_STARTED .
# *Enable downstream vertex connecting to an EPHEMERAL data source, to reason
about network connections of upstream tasks.*
Another property that is usually shared with CONCURRENT on the same edge is
EPHEMERAL data source. When two vertices are running concurrently, direct
communications between tasks in those vertices become possible, and oftentimes
necessary, throughout the lifetime of the running task. This can be articulated
by an EPHEMERAL data sources, and this change aims to support such scenarios,
which are readily found in real-time applications (such as interactive query)
and/or customized applications that would like to control their own data
communications (such as parameter-server).
This change will allow Tez to be the central orchestrator that gathers
necessary network information from all upstream tasks, compiles them together
and send it to downstream tasks. Particularly, the following changes are
planned:
# For two vertices connected via an edge with both CONCURRENT scheduling type
and EPHEMERAL data source type, the task in upstream vertex will open network
port, and send an VertexManagerEvent(VME) immediately upon running. The payload
of VME includes necessary information to communicate to this task through
direct network communication (such as ip and port). The vertex manager of the
downstream vertex, typed VertexManagerWithConcurrentInputs, will receive these
VMEs, and are responsible for aggregate (including de-dup if necessary) all
information together in onVertexManagerEventReceived().
# Once all VMEs have been received, a CustomProcessorEvent will be constructed
with a payload that includes the aggregated information, and be routed to
downstream tasks.
The change will introduce additional optional entries in
VertexManagerEventPayload and a new custom payload that will be embedded into
CustomProcessorEvent.
Upon completion of functional feature in this change, additional feature such
as handling of failover in CONCURRENT/EPHEMERAL edge will be addressed in
future umbrea JIRAs.
# *Allow mixture of CONCURRENT/SEQUENTIAL incoming edges on the same vertex*
In the above two changes, we assume that a vertex’s incoming edges should have
the same edge property in terms of Scheduling Type, i.e., they are either all
SEQUENTIAL, or, all CONCURRENT.
We shall extend beyond this assumption in this change to allow mixture of
different incoming edge types, as exemplified in Fig.2.
see [google
doc|https://docs.google.com/document/d/17WRi2hwQGb-ms0-yHMp4OJf6yjmbh5aBy98N6z-tUgY/edit?usp=sharing]
for figure
Fig. 2 Vertex with mixture of input edges
This change will mainly focus on enriching the VertexMangagerPlugin
implementation that we introduced in our first change, namely, the
VertexManagerWithConcurrentInputs. No API change is expected with this change.
h2. Reference
[1] Apache Tez: A Unifying Framework for Modeling and Building Data Processing
Applications SIGMOD’15 http://dl.acm.org/authorize?N97131
[2] Bubble Execution: Resource-aware Reliable Analytics at Cloud Scale , VLDB
2018 [http://www.vldb.org/pvldb/vol11/p746-yin.pdf]
[3] [https://www.tensorflow.org/deploy/distributed]
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)