[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16680505#comment-16680505 ]
ASF GitHub Bot commented on TEZ-3998: ------------------------------------- Github user yingdachen commented on a diff in the pull request: https://github.com/apache/tez/pull/33#discussion_r232084381 --- Diff: tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java --- @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.library.vertexmanager; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.VertexManagerPlugin; +import org.apache.tez.dag.api.VertexManagerPluginContext; +import org.apache.tez.dag.api.VertexManagerPluginDescriptor; +import org.apache.tez.dag.api.event.VertexState; +import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.TaskAttemptIdentifier; +import org.apache.tez.runtime.api.events.VertexManagerEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT; + +public class VertexManagerWithConcurrentInput extends VertexManagerPlugin { + + private static final Logger LOG = LoggerFactory.getLogger(VertexManagerWithConcurrentInput.class); + + private final Map<String, Boolean> srcVerticesConfigured = Maps.newConcurrentMap(); + private int managedTasks; + private boolean tasksScheduled = false; + private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false); + private Configuration vertexConfig; + private String vertexName; + private ConcurrentEdgeTriggerType edgeTriggerType; + private boolean allSrcVerticesConfigured; + + int completedUpstreamTasks; + + public VertexManagerWithConcurrentInput(VertexManagerPluginContext context) { + super(context); + } + + @Override + public void initialize() { + if (getContext().getUserPayload() == null) { --- End diff -- thanks for pointing this out. done > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > ----------------------------------------------------------------------------------------- > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task > Reporter: Yingda Chen > Assignee: Yingda Chen > Priority: Major > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > > This includes enabling the CONCURRENT SchedulingType as a valid edge > property, by removing all the sanity check against CONCURRENT during DAG > construction/execution. A new VertexManagerPlugin (namely > VertexManagerWithConcurrentInput) will be implemented for vertex with > incoming concurrent edge(s). > In addition, we will assume in this change that > * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges > * No shuffle or data movement is handled by Tez framework when two vertices > are connected through a CONCURRENT edge. Instead, runtime should be > responsible for handling all the data-plane communications (as proposed in > [1]). > Note that the above assumptions are common for scenarios such as whole-DAG or > sub-graph gang scheduling, but they may be relaxed in later implementation, > which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex. > > Most of the (meaningful) scheduling decisions today in Tez are made based on > the notion of (or an extended version of) source task completion. This will > no longer be true in presence of CONCURRENT edge. Instead, events such as > source vertex configured, or source task running will become more relevant > when making scheduling decision for two vertices connected via a CONCURRENT > edge. We therefore introduce a new enum *ConcurrentSchedulingType* to > describe the “scheduling timing” for the downstream vertex in such scenarios. > |public enum ConcurrentSchedulingType{ > /** * trigger downstream vertex tasks scheduling by "configured" event of > upstream vertices */ > SOURCE_VERTEX_CONFIGURED, > /** * trigger downstream vertex tasks scheduling by "running" event of > upstream tasks */ > SOURCE_TASK_STARTED > }| > > Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the > scheduling type, which suffice for scenarios of whole-DAG or sub-graph > gang-scheduling, where we want (all the tasks in) the downstream vertex to be > scheduled together with (all the tasks) in the upstream vertex. In this case, > we can leverage the existing onVertexStateUpdated() interface of > VextexMangerPlugin to collect relevant information to assist the scheduling > decision, and *there is no additional API change necessary*. However, in more > subtle case such as the parameter-server example described in Fig. 1, other > scheduling type would be more relevant, therefore the placeholder for > *ConcurrentSchedulingType* will be introduced in this change as part of the > infrastructure work. > > Finally, since we assume that all communications between two vertices > connected via CONCURRENT edge are handled by application runtime, a > CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all > DME/VME handling. -- This message was sent by Atlassian JIRA (v7.6.3#76005)