Yingda Chen created TEZ-3997:
--------------------------------

             Summary: Enable CONCURRENT edge
                 Key: TEZ-3997
                 URL: https://issues.apache.org/jira/browse/TEZ-3997
             Project: Apache Tez
          Issue Type: New Feature
            Reporter: Yingda Chen


A better formatted (and commentable) google doc with figures can be found at 

[https://docs.google.com/document/d/17WRi2hwQGb-ms0-yHMp4OJf6yjmbh5aBy98N6z-tUgY/edit?usp=sharing]

 
h1. *Enable CONCURRENT edge in Tez*

 
h2. *Motivation*

Tez was designed, from day one, to be a unifying framework for building data 
processing application[1], with the promise to support different workloads. Yet 
the focus on Tez has largely placed on supporting batch data processing, such 
as Hive/Pig. For those applications, edge SchedulingType is usually modeled as 
SEQUENTIAL, with the between-vertices shuffle implemented on top of Tez APIs. 
We believe that there are legitimate needs to fully enable the CONCURRENT 
SchedulingType, which break away from the assumption that destination vertex 
can only be scheduled after (part of) the the tasks in source vertex have been 
completed. 

There are various scenarios where CONCURRENT scheduling type can be helpful, 
such as the gang scheduling of the whole DAG, or a refined version that 
gang-scheduled a sub-graph of a DAG, known as “bubble scheduling”[2].  In 
addition, we have found that for Tez to truly unify workloads other than 
conventional MR or SQL-like applications, the need of CONCURRENT scheduling 
become more pressing. For example, a parameter-server application[3] can be 
modeled as a DAG below, where *PS* denotes the vertex that hosts 
parameter-servers, and *W* denotes the vertex that hosts workers responsible 
for heavy-lifting data-processing. There are two fundamental assumptions that 
must be satisfied for parameter-server to work:

 

see [google 
doc|https://docs.google.com/document/d/17WRi2hwQGb-ms0-yHMp4OJf6yjmbh5aBy98N6z-tUgY/edit?usp=sharing]
 for figure

Fig. 1, Parameter-server modeled as a DAG with concurrent edge

 
 # All servers (i.e., all tasks in PS vertex) must be up and running before any 
worker (task in W vertex) can make meaningful progress
 # All servers must run concurrently with the workers through the lifetime of 
the job

 

Note that one salient common trait shared by the above example is that the 
EPHEMERAL data source type comes hand-in-hand with the CONCURRENT scheduling 
type. While this is what we have found to be true in many practical scenarios, 
the original design in Tez that provides orthogonal  DataSourceType and 
Scheduling remains more descriptive, and the proposed changes here would keep 
that intact. 

 

Overall, we believe that the fundamental design of Tez framework, such as the 
pluggale Edge/Vertex managers and versatile edge types, provides the 
customizability needed to enable the various scenarios described above, and we 
propose to make the following changes.

 
h2. *Proposed Changes*

To address the above issues, we propose the following changes:
 # (No API change) Allow CONCURRENT edge property in DAG construction and 
introduce ConcurrentSchedulingType.
 # (Some API change/addition) Extend VertexManagerPlugin interface to allow for 
relevant events notification.
 # Enable downstream vertex connecting to an EPHEMERAL data source, to reason 
about network connections of upstream tasks. 
 # Allow mixture of CONCURRENT/SEQUENTIAL incoming edges on the same vertex

 

The details for the proposed changes are provided in the following sections.
 * *Allow CONCURRENT edge property in DAG construction and introduce 
ConcurrentSchedulingType*

 
|Note:There is no API change in this proposed change. The majority of this 
change will be lifting some existing constraints against CONCURRENT edge type, 
and addition of a VertexMangerPlugin implementation.|

 

This includes enabling the CONCURRENT SchedulingType as a valid edge property, 
by removing all the sanity check against CONCURRENT during DAG 
construction/execution. A new VertexManagerPlugin (namely 
VertexManagerWithConcurrentInput) will be implemented for vertex with incoming 
concurrent edge(s). 

In addition, we will assume in this change that 
 * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges 
 * No shuffle or data movement is handled by Tez framework when two vertices 
are connected through a CONCURRENT edge. Instead, runtime should be responsible 
for handling all the data-plane communications (as proposed in [1]).

Note that the above assumptions are common for scenarios such as whole-DAG or 
sub-graph gang scheduling, but they may be relaxed in later implementation, 
which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex.

 

Most of the (meaningful) scheduling decisions today in Tez are made based on 
the notion of (or an extended version of) source task completion. This will no 
longer be true in presence of CONCURRENT edge. Instead, events such as source 
vertex configured, or source task running will become more relevant when making 
scheduling decision for two vertices connected via a CONCURRENT edge.  We 
therefore introduce a new enum *ConcurrentSchedulingType* to describe the 
“scheduling timing” for the downstream vertex in such scenarios. 
|public enum ConcurrentSchedulingType{
   /** * trigger downstream vertex tasks scheduling by "configured" event of 
upstream vertices */
  SOURCE_VERTEX_CONFIGURED,
   /** * trigger downstream vertex tasks scheduling by "running" event of 
upstream tasks */ 
  SOURCE_TASK_STARTED 
}|

 

Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the 
scheduling type, which suffice for scenarios of whole-DAG or sub-graph 
gang-scheduling, where we want (all the tasks in) the downstream vertex to be 
scheduled together with (all the tasks) in the upstream vertex. In this case, 
we can leverage the existing onVertexStateUpdated() interface of 
VextexMangerPlugin to collect relevant information to assist the scheduling 
decision, and *there is no additional API change necessary*. However, in more 
subtle case such as the parameter-server example described in Fig. 1, other 
scheduling type would be more relevant, therefore the placeholder for 
*ConcurrentSchedulingType* will be introduced in this change as part of the 
infrastructure work.

 

Finally, since we assume that all communications between two vertices connected 
via CONCURRENT edge are handled by application runtime, a CONCURRENT edge will 
be assigned a DummyEdgeManager that basically mute all DME/VME handling.
 #  *Extend VertexManagerPlugin interface to allow for relevant events 
notification*

For concurrent connection, the downstream and upstream vertices would be 
running concurrently, and in some cases, they would be scheduled at the same 
time as well, such as (sub-graph) gang scheduling. However, *this is not always 
true*. In the example in Fig. 1, tasks in PS vertex should be running before 
tasks in W vertex should be scheduled. Since otherwise if the resource requests 
for PS cannot be fulfilled first, W will be spinning in vain. In other 
examples, as long as part of tasks in upstream vertex are running, we can start 
scheduling downstream tasks. 

 

In other words, if we put this into the context of existing 
interface/implementation of VertexMangerPlugin, we can see strong duality of 
“OnSourceTaskRunning” for concurrent connection vs the “OnSourceTaskCompleted” 
for (existing) sequential connection. Therefore, we propose an addition of 
“_onConcurrentSourceTaskRunning(TaskAttemptIdentifer attempt)_” interface to 
the VertexManager Plugin, with default implementation being not supported.

 

This change will also include the logic to add source task running event and to 
send such events to downstream vertices. To reduce unnecessary event traffic, 
we will limit the sending of such events to CONCURRENT edge, and when the 
ConcurrentSchedulingType is specified to be  SOURCE_TASK_STARTED .

 
 #  *Enable downstream vertex connecting to an EPHEMERAL data source, to reason 
about network connections of upstream tasks.*

Another property that is usually shared with CONCURRENT on the same edge is 
EPHEMERAL data source. When two vertices are running concurrently, direct 
communications between tasks in those vertices become possible, and oftentimes 
necessary, throughout the lifetime of the running task. This can be articulated 
by an EPHEMERAL data sources, and this change aims to support such scenarios, 
which are readily found in real-time applications (such as interactive query) 
and/or customized applications that would like to control their own data 
communications (such as parameter-server).

 

This change will allow Tez to be the central orchestrator that gathers 
necessary network information from all upstream tasks, compiles them together 
and send it to downstream tasks. Particularly, the following changes are 
planned:
 # For two vertices connected via an edge with both CONCURRENT scheduling type 
and EPHEMERAL data source type, the task in upstream vertex will open network 
port, and send an VertexManagerEvent(VME) immediately upon running. The payload 
of VME includes necessary information to communicate to this task through 
direct network communication (such as ip and port). The vertex manager of the 
downstream vertex, typed VertexManagerWithConcurrentInputs, will receive these 
VMEs, and are responsible for aggregate (including de-dup if necessary) all 
information together in onVertexManagerEventReceived(). 
 # Once all VMEs have been received, a CustomProcessorEvent will be constructed 
with a payload that includes the aggregated information, and be routed to 
downstream tasks.

The change will introduce additional optional entries in 
VertexManagerEventPayload and a new custom payload that will be embedded into 
CustomProcessorEvent. 

 

Upon completion of functional feature in this change, additional feature such 
as handling of failover in CONCURRENT/EPHEMERAL edge will be addressed in 
future umbrea JIRAs. 

 
 #  *Allow mixture of CONCURRENT/SEQUENTIAL incoming edges on the same vertex*

In the above two changes, we assume that a vertex’s incoming edges should have 
the same edge property in terms of Scheduling Type, i.e., they are either all 
SEQUENTIAL, or, all CONCURRENT.

We shall extend beyond this assumption in this change to allow mixture of 
different incoming edge types, as exemplified in Fig.2. 

see [google 
doc|https://docs.google.com/document/d/17WRi2hwQGb-ms0-yHMp4OJf6yjmbh5aBy98N6z-tUgY/edit?usp=sharing]
 for figure

Fig. 2 Vertex with mixture of input edges

This change will mainly focus on enriching the VertexMangagerPlugin 
implementation that we introduced in our first change, namely, the 
VertexManagerWithConcurrentInputs. No API change is expected with this change.

 
h2. Reference

[1] Apache Tez: A Unifying Framework for Modeling and Building Data Processing 
Applications SIGMOD’15 http://dl.acm.org/authorize?N97131

[2] Bubble Execution: Resource-aware Reliable Analytics at Cloud Scale , VLDB 
2018 [http://www.vldb.org/pvldb/vol11/p746-yin.pdf]

[3] [https://www.tensorflow.org/deploy/distributed]

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to