[ https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16680279#comment-16680279 ]
ASF GitHub Bot commented on TEZ-3998: ------------------------------------- Github user beltran commented on a diff in the pull request: https://github.com/apache/tez/pull/33#discussion_r232028676 --- Diff: tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java --- @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.library.vertexmanager; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.VertexManagerPlugin; +import org.apache.tez.dag.api.VertexManagerPluginContext; +import org.apache.tez.dag.api.VertexManagerPluginDescriptor; +import org.apache.tez.dag.api.event.VertexState; +import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.TaskAttemptIdentifier; +import org.apache.tez.runtime.api.events.VertexManagerEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT; + +public class VertexManagerWithConcurrentInput extends VertexManagerPlugin { + + private static final Logger LOG = LoggerFactory.getLogger(VertexManagerWithConcurrentInput.class); + + private final Map<String, Boolean> srcVerticesConfigured = Maps.newConcurrentMap(); + private int managedTasks; + private boolean tasksScheduled = false; + private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false); + private Configuration vertexConfig; + private String vertexName; + private ConcurrentEdgeTriggerType edgeTriggerType; + private boolean allSrcVerticesConfigured; + + int completedUpstreamTasks; + + public VertexManagerWithConcurrentInput(VertexManagerPluginContext context) { + super(context); + } + + @Override + public void initialize() { + if (getContext().getUserPayload() == null) { + throw new TezUncheckedException("user payload cannot be null for VertexManagerWithConcurrentInput"); + } + managedTasks = getContext().getVertexNumTasks(getContext().getVertexName()); + Map<String, EdgeProperty> edges = getContext().getInputVertexEdgeProperties(); + for (Map.Entry<String, EdgeProperty> entry : edges.entrySet()) { + if (!CONCURRENT.equals(entry.getValue().getSchedulingType())) { + throw new TezUncheckedException("All input edges to vertex " + vertexName + + " must be CONCURRENT."); + } + String srcVertex = entry.getKey(); + srcVerticesConfigured.put(srcVertex, false); + getContext().registerForVertexStateUpdates(srcVertex, EnumSet.of(VertexState.CONFIGURED)); + } + + try { + vertexConfig = TezUtils.createConfFromUserPayload(getContext().getUserPayload()); + } catch (IOException e) { + throw new TezUncheckedException(e); + } + edgeTriggerType = ConcurrentEdgeTriggerType.valueOf( + vertexConfig.get(TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE, + TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE_DEFAULT)); + if (!ConcurrentEdgeTriggerType.SOURCE_VERTEX_CONFIGURED.equals(edgeTriggerType)) { + // pending TEZ-3999 + throw new TezUncheckedException("Only support SOURCE_VERTEX_CONFIGURED triggering type for now."); + } + LOG.info("VertexManagerWithConcurrentInput initialized with edgeTriggerType {}.", edgeTriggerType); + + vertexName = getContext().getVertexName(); + completedUpstreamTasks = 0; + } + + @Override + public void onVertexStarted(List<TaskAttemptIdentifier> completions) { + onVertexStartedDone.set(true); + scheduleTasks(); + } + + @Override + public void onVertexStateUpdated(VertexStateUpdate stateUpdate) { + VertexState state = stateUpdate.getVertexState(); + String fromVertex = stateUpdate.getVertexName(); + if (!srcVerticesConfigured.containsKey(fromVertex)) { + throw new IllegalArgumentException("Not expecting state update from vertex:" + + fromVertex + " in vertex: " + this.vertexName); + } + + if (!VertexState.CONFIGURED.equals(state)) { + throw new IllegalArgumentException("Received incorrect state notification : " + + state + " from vertex: " + fromVertex + " in vertex: " + this.vertexName); + } + + LOG.info("Received configured notification: " + state + " for vertex: " + + fromVertex + " in vertex: " + this.vertexName); + srcVerticesConfigured.put(fromVertex, true); + + // check for source vertices completely configured + boolean checkAllSrcVerticesConfigured = true; + for (Map.Entry<String, Boolean> entry : srcVerticesConfigured.entrySet()) { + if (!entry.getValue()) { + // vertex not configured + LOG.info("Waiting for vertex {} in vertex {} ", entry.getKey(), this.vertexName); + checkAllSrcVerticesConfigured = false; + break; + } + } + allSrcVerticesConfigured = checkAllSrcVerticesConfigured; --- End diff -- Doing it like this makes me wonder if the tasks can be scheduled twice for example `onVertexStateUpdated` being called twice closely: * Thread 1: calls onVertexStateUpdated * Thread 2: calls onVertexStateUpdated * Thread 1: at `allSrcVerticesConfigured = checkAllSrcVerticesConfigured;` is set to false * Thread 2: at `allSrcVerticesConfigured = checkAllSrcVerticesConfigured;` is set to true now * Thread 1: scheduleTasks schedules all the tasks because `allSrcVerticesConfigured` is true * Thread 2: scheduleTasks schedules all the tasks because `allSrcVerticesConfigured` is true I see the variable `tasksScheduled` is used, but I think it could still happen. Maybe with some `AtomicBoolean.getAndSet` we'd be safer. > Allow CONCURRENT edge property in DAG construction and introduce > ConcurrentSchedulingType > ----------------------------------------------------------------------------------------- > > Key: TEZ-3998 > URL: https://issues.apache.org/jira/browse/TEZ-3998 > Project: Apache Tez > Issue Type: Task > Reporter: Yingda Chen > Assignee: Yingda Chen > Priority: Major > > This is the first task related to TEZ-3997 > > |Note: There is no API change in this proposed change. The majority of this > change will be lifting some existing constraints against CONCURRENT edge > type, and addition of a VertexMangerPlugin implementation.| > > This includes enabling the CONCURRENT SchedulingType as a valid edge > property, by removing all the sanity check against CONCURRENT during DAG > construction/execution. A new VertexManagerPlugin (namely > VertexManagerWithConcurrentInput) will be implemented for vertex with > incoming concurrent edge(s). > In addition, we will assume in this change that > * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges > * No shuffle or data movement is handled by Tez framework when two vertices > are connected through a CONCURRENT edge. Instead, runtime should be > responsible for handling all the data-plane communications (as proposed in > [1]). > Note that the above assumptions are common for scenarios such as whole-DAG or > sub-graph gang scheduling, but they may be relaxed in later implementation, > which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex. > > Most of the (meaningful) scheduling decisions today in Tez are made based on > the notion of (or an extended version of) source task completion. This will > no longer be true in presence of CONCURRENT edge. Instead, events such as > source vertex configured, or source task running will become more relevant > when making scheduling decision for two vertices connected via a CONCURRENT > edge. We therefore introduce a new enum *ConcurrentSchedulingType* to > describe the “scheduling timing” for the downstream vertex in such scenarios. > |public enum ConcurrentSchedulingType{ > /** * trigger downstream vertex tasks scheduling by "configured" event of > upstream vertices */ > SOURCE_VERTEX_CONFIGURED, > /** * trigger downstream vertex tasks scheduling by "running" event of > upstream tasks */ > SOURCE_TASK_STARTED > }| > > Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the > scheduling type, which suffice for scenarios of whole-DAG or sub-graph > gang-scheduling, where we want (all the tasks in) the downstream vertex to be > scheduled together with (all the tasks) in the upstream vertex. In this case, > we can leverage the existing onVertexStateUpdated() interface of > VextexMangerPlugin to collect relevant information to assist the scheduling > decision, and *there is no additional API change necessary*. However, in more > subtle case such as the parameter-server example described in Fig. 1, other > scheduling type would be more relevant, therefore the placeholder for > *ConcurrentSchedulingType* will be introduced in this change as part of the > infrastructure work. > > Finally, since we assume that all communications between two vertices > connected via CONCURRENT edge are handled by application runtime, a > CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all > DME/VME handling. -- This message was sent by Atlassian JIRA (v7.6.3#76005)