[ 
https://issues.apache.org/jira/browse/TEZ-3998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16680606#comment-16680606
 ] 

ASF GitHub Bot commented on TEZ-3998:
-------------------------------------

Github user beltran commented on a diff in the pull request:

    https://github.com/apache/tez/pull/33#discussion_r232101282
  
    --- Diff: 
tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java
 ---
    @@ -0,0 +1,242 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.tez.dag.library.vertexmanager;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Lists;
    +import com.google.common.collect.Maps;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.tez.common.TezUtils;
    +import org.apache.tez.dag.api.EdgeProperty;
    +import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType;
    +import org.apache.tez.dag.api.InputDescriptor;
    +import org.apache.tez.dag.api.TezConfiguration;
    +import org.apache.tez.dag.api.TezUncheckedException;
    +import org.apache.tez.dag.api.VertexManagerPlugin;
    +import org.apache.tez.dag.api.VertexManagerPluginContext;
    +import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
    +import org.apache.tez.dag.api.event.VertexState;
    +import org.apache.tez.dag.api.event.VertexStateUpdate;
    +import org.apache.tez.runtime.api.Event;
    +import org.apache.tez.runtime.api.TaskAttemptIdentifier;
    +import org.apache.tez.runtime.api.events.VertexManagerEvent;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nullable;
    +import java.io.IOException;
    +import java.util.EnumSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +
    +import static 
org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT;
    +
    +public class VertexManagerWithConcurrentInput extends VertexManagerPlugin {
    +
    +  private static final Logger LOG = 
LoggerFactory.getLogger(VertexManagerWithConcurrentInput.class);
    +
    +  private final Map<String, Boolean> srcVerticesConfigured = 
Maps.newConcurrentMap();
    +  private int managedTasks;
    +  private boolean tasksScheduled = false;
    +  private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false);
    +  private Configuration vertexConfig;
    +  private String vertexName;
    +  private ConcurrentEdgeTriggerType edgeTriggerType;
    +  private boolean allSrcVerticesConfigured;
    +
    +  int completedUpstreamTasks;
    +
    +  public VertexManagerWithConcurrentInput(VertexManagerPluginContext 
context) {
    +    super(context);
    +  }
    +
    +  @Override
    +  public void initialize() {
    +    if (getContext().getUserPayload() == null) {
    +      throw new TezUncheckedException("user payload cannot be null for 
VertexManagerWithConcurrentInput");
    +    }
    +    managedTasks = 
getContext().getVertexNumTasks(getContext().getVertexName());
    +    Map<String, EdgeProperty> edges = 
getContext().getInputVertexEdgeProperties();
    +    for (Map.Entry<String, EdgeProperty> entry : edges.entrySet()) {
    +      if (!CONCURRENT.equals(entry.getValue().getSchedulingType())) {
    +        throw new TezUncheckedException("All input edges to vertex " + 
vertexName +
    +            "  must be CONCURRENT.");
    +      }
    +      String srcVertex = entry.getKey();
    +      srcVerticesConfigured.put(srcVertex, false);
    +      getContext().registerForVertexStateUpdates(srcVertex, 
EnumSet.of(VertexState.CONFIGURED));
    +    }
    +
    +    try {
    +      vertexConfig = 
TezUtils.createConfFromUserPayload(getContext().getUserPayload());
    +    } catch (IOException e) {
    +      throw new TezUncheckedException(e);
    +    }
    +    edgeTriggerType = ConcurrentEdgeTriggerType.valueOf(
    +        vertexConfig.get(TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE,
    +            TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE_DEFAULT));
    +    if 
(!ConcurrentEdgeTriggerType.SOURCE_VERTEX_CONFIGURED.equals(edgeTriggerType)) {
    +      // pending TEZ-3999
    +      throw new TezUncheckedException("Only support 
SOURCE_VERTEX_CONFIGURED triggering type for now.");
    +    }
    +    LOG.info("VertexManagerWithConcurrentInput initialized with 
edgeTriggerType {}.", edgeTriggerType);
    +
    +    vertexName = getContext().getVertexName();
    +    completedUpstreamTasks = 0;
    +  }
    +
    +  @Override
    +  public void onVertexStarted(List<TaskAttemptIdentifier> completions) {
    +    onVertexStartedDone.set(true);
    +    scheduleTasks();
    +  }
    +
    +  @Override
    +  public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
    +    VertexState state = stateUpdate.getVertexState();
    +    String fromVertex = stateUpdate.getVertexName();
    +    if (!srcVerticesConfigured.containsKey(fromVertex)) {
    +      throw new IllegalArgumentException("Not expecting state update from 
vertex:" +
    +          fromVertex + " in vertex: " + this.vertexName);
    +    }
    +
    +    if (!VertexState.CONFIGURED.equals(state)) {
    +      throw new IllegalArgumentException("Received incorrect state 
notification : " +
    +          state + " from vertex: " + fromVertex + " in vertex: " + 
this.vertexName);
    +    }
    +
    +    LOG.info("Received configured notification: " + state + " for vertex: "
    +        + fromVertex + " in vertex: " + this.vertexName);
    +    srcVerticesConfigured.put(fromVertex, true);
    +
    +    // check for source vertices completely configured
    +    boolean checkAllSrcVerticesConfigured = true;
    +    for (Map.Entry<String, Boolean> entry : 
srcVerticesConfigured.entrySet()) {
    +      if (!entry.getValue()) {
    +        // vertex not configured
    +        LOG.info("Waiting for vertex {} in vertex {} ", entry.getKey(), 
this.vertexName);
    +        checkAllSrcVerticesConfigured = false;
    +        break;
    +      }
    +    }
    +    allSrcVerticesConfigured = checkAllSrcVerticesConfigured;
    --- End diff --
    
    Sorry I meant `compareAndSet` to make sure 
`getContext().scheduleTasks(tasksToStart)` is not called twice. I think now it 
could still happen. So it would be something like
    ```java
    if (tasksScheduled .compareAndSet(false, true)) {
         // schedule tasks
    }
    ```


> Allow CONCURRENT edge property in DAG construction and introduce 
> ConcurrentSchedulingType
> -----------------------------------------------------------------------------------------
>
>                 Key: TEZ-3998
>                 URL: https://issues.apache.org/jira/browse/TEZ-3998
>             Project: Apache Tez
>          Issue Type: Task
>            Reporter: Yingda Chen
>            Assignee: Yingda Chen
>            Priority: Major
>
> This is the first task related to TEZ-3997
>  
> |Note: There is no API change in this proposed change. The majority of this 
> change will be lifting some existing constraints against CONCURRENT edge 
> type, and addition of a VertexMangerPlugin implementation.|
>  
> This includes enabling the CONCURRENT SchedulingType as a valid edge 
> property, by removing all the sanity check against CONCURRENT during DAG 
> construction/execution. A new VertexManagerPlugin (namely 
> VertexManagerWithConcurrentInput) will be implemented for vertex with 
> incoming concurrent edge(s). 
> In addition, we will assume in this change that 
>  * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges 
>  * No shuffle or data movement is handled by Tez framework when two vertices 
> are connected through a CONCURRENT edge. Instead, runtime should be 
> responsible for handling all the data-plane communications (as proposed in 
> [1]).
> Note that the above assumptions are common for scenarios such as whole-DAG or 
> sub-graph gang scheduling, but they may be relaxed in later implementation, 
> which may allow mixture of SEQUENTIAL and CONCURRENT edges on the same vertex.
>  
> Most of the (meaningful) scheduling decisions today in Tez are made based on 
> the notion of (or an extended version of) source task completion. This will 
> no longer be true in presence of CONCURRENT edge. Instead, events such as 
> source vertex configured, or source task running will become more relevant 
> when making scheduling decision for two vertices connected via a CONCURRENT 
> edge.  We therefore introduce a new enum *ConcurrentSchedulingType* to 
> describe the “scheduling timing” for the downstream vertex in such scenarios. 
> |public enum ConcurrentSchedulingType{
>    /** * trigger downstream vertex tasks scheduling by "configured" event of 
> upstream vertices */
>   SOURCE_VERTEX_CONFIGURED,
>    /** * trigger downstream vertex tasks scheduling by "running" event of 
> upstream tasks */ 
>   SOURCE_TASK_STARTED 
> }|
>  
> Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the 
> scheduling type, which suffice for scenarios of whole-DAG or sub-graph 
> gang-scheduling, where we want (all the tasks in) the downstream vertex to be 
> scheduled together with (all the tasks) in the upstream vertex. In this case, 
> we can leverage the existing onVertexStateUpdated() interface of 
> VextexMangerPlugin to collect relevant information to assist the scheduling 
> decision, and *there is no additional API change necessary*. However, in more 
> subtle case such as the parameter-server example described in Fig. 1, other 
> scheduling type would be more relevant, therefore the placeholder for 
> *ConcurrentSchedulingType* will be introduced in this change as part of the 
> infrastructure work.
>  
> Finally, since we assume that all communications between two vertices 
> connected via CONCURRENT edge are handled by application runtime, a 
> CONCURRENT edge will be assigned a DummyEdgeManager that basically mute all 
> DME/VME handling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to