[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16680624#comment-16680624 ]
ASF GitHub Bot commented on TEZ-3998: ------------------------------------- Github user yingdachen commented on a diff in the pull request: https://github.com/apache/tez/pull/33#discussion_r232102507 --- Diff: tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java --- @@ -2440,23 +2453,30 @@ public void run() { } } - private void startDAG() throws IOException, TezException { + private boolean hasConcurrentEdge(DAGPlan dagPlan) { + boolean hasConcurrentEdge = false; + for (DAGProtos.EdgePlan edge : dagPlan.getEdgeList()) { + if (DAGProtos.PlanEdgeSchedulingType.CONCURRENT.equals(edge.getSchedulingType())) { + return true; + } + } + return hasConcurrentEdge; + } + + private DAGPlan readDAGPlanFile() throws IOException, TezException { FileInputStream dagPBBinaryStream = null; + DAGPlan dagPlan = null; try { - DAGPlan dagPlan = null; - // Read the protobuf DAG dagPBBinaryStream = new FileInputStream(new File(workingDirectory, TezConstants.TEZ_PB_PLAN_BINARY_NAME)); dagPlan = DAGPlan.parseFrom(dagPBBinaryStream); - - startDAG(dagPlan, null); --- End diff -- 1993 is the line for if (recoveredDAGData != null) , when inside that branch startDAG would not be called, it would only be called in the corresponding "else" branch > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > ----------------------------------------------------------------------------------------- > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task > Reporter: Yingda Chen > Assignee: Yingda Chen > Priority: Major > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > > This includes enabling the CONCURRENT SchedulingType as a valid edge > property, by removing all the sanity check against CONCURRENT during DAG > construction/execution. A new VertexManagerPlugin (namely > VertexManagerWithConcurrentInput) will be implemented for vertex with > incoming concurrent edge(s). > In addition, we will assume in this change that > * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges > * No shuffle or data movement is handled by Tez framework when two vertices > are connected through a CONCURRENT edge. Instead, runtime should be > responsible for handling all the data-plane communications (as proposed in > [1]). > Note that the above assumptions are common for scenarios such as whole-DAG or > sub-graph gang scheduling, but they may be relaxed in later implementation, > which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex. > > Most of the (meaningful) scheduling decisions today in Tez are made based on > the notion of (or an extended version of) source task completion. This will > no longer be true in presence of CONCURRENT edge. Instead, events such as > source vertex configured, or source task running will become more relevant > when making scheduling decision for two vertices connected via a CONCURRENT > edge. We therefore introduce a new enum *ConcurrentSchedulingType* to > describe the “scheduling timing” for the downstream vertex in such scenarios. > |public enum ConcurrentSchedulingType{ > /** * trigger downstream vertex tasks scheduling by "configured" event of > upstream vertices */ > SOURCE_VERTEX_CONFIGURED, > /** * trigger downstream vertex tasks scheduling by "running" event of > upstream tasks */ > SOURCE_TASK_STARTED > }| > > Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the > scheduling type, which suffice for scenarios of whole-DAG or sub-graph > gang-scheduling, where we want (all the tasks in) the downstream vertex to be > scheduled together with (all the tasks) in the upstream vertex. In this case, > we can leverage the existing onVertexStateUpdated() interface of > VextexMangerPlugin to collect relevant information to assist the scheduling > decision, and *there is no additional API change necessary*. However, in more > subtle case such as the parameter-server example described in Fig. 1, other > scheduling type would be more relevant, therefore the placeholder for > *ConcurrentSchedulingType* will be introduced in this change as part of the > infrastructure work. > > Finally, since we assume that all communications between two vertices > connected via CONCURRENT edge are handled by application runtime, a > CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all > DME/VME handling. -- This message was sent by Atlassian JIRA (v7.6.3#76005)