[ 
https://issues.apache.org/jira/browse/TEZ-2003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14680784#comment-14680784
 ] 

Bikas Saha commented on TEZ-2003:
---------------------------------

Some initial comments on the modified existing code. Not yet seen the newly 
added code. 
One repeated item in the comments is the special casing of uber/yarn mode in 
different places. I would expect plugins to come in from the user when the AM 
is created and either the client/AppMaster would create default plugins for 
uber/yarn. Thereafter dag/vertex/routers should not need to have special casing 
for any plugin (like the uber/yarn special casing that exists in all these 
places in the branch). e.g. the vertexmanager only uses 
VertexManagerPluginDescriptor - even for the built-in plugins. Similar, I would 
expect the communicator/scheduler/launcher wrappers to work only with plugin 
descriptors.
Also creating a ServicePlugin class will help in reducing code duplication and 
make maintenance easier instead of having scheduler id, launcherId and commId 
everywhere.
ContainerSignatureMatcher -> ExecutorSignatureMatcher ?
{code}
+public interface ContainerSignatureMatcher {
{code}
Why is this here?

ServicePluginLifecyle etc. in tez-runtime-api like 
Inputs/Output/InputInitializer etc. ?
Typically we say start() -> stop() instead of shutdown
{code}+public interface ServicePluginLifecycle {
+  void start() throws Exception;
+  void shutdown() throws Exception;
{code}

Why are executedInAm and executeInContainers there?
{code}
+  public static class VertexExecutionContext {
+    final boolean executeInAm;
+    final boolean executeInContainers;
+    final String taskSchedulerName;{code}

Rename to ExecutorEndReason ? Also, how can "An error in the AM" be caused by a 
container running a task?
{code}
+public enum ContainerEndReason {
+  NODE_FAILED, // Completed because the node running the container was marked 
as dead
+  APPLICATION_ERROR, // An error in the AM caused by user code
+  FRAMEWORK_ERROR, // An error in the AM - likely a bug.
+  LAUNCH_FAILED, // Failure to launch the container
+}{code}

Why does this have schedulerName and taskCommName ?
{code}
+public class ContainerLaunchRequest extends ContainerLauncherOperationBase {
+
+  private final ContainerLaunchContext clc;
+  private final Container container;
+} {code}

Why enableContainers and enableUber?
{code}
+public class ServicePluginsDescriptor {
+
+  private final boolean enableContainers;
+  private final boolean enableUber;
+ {code}

Has the internal one been replaced by this?
{code}
+public enum TaskAttemptEndReason {
+  NODE_FAILED, // Completed because the node running the container was marked 
as dead
+}{code}

Rename to ExecutorBusy ?
{code}
+  COMMUNICATION_ERROR, // Equivalent to a launch failure
+  SERVICE_BUSY, // Service rejected the task
+  INTERRUPTED_BY_SYSTEM, // Interrupted by the system. e.g. Pre-emption
+  INTERRUPTED_BY_USER, // Interrupted by the user
+
 }{code}

Why isLocal flag needs to be passed to Scheduler/Launcher/Communicator routers? 
Instead of a service plugin for local

Is is ensured that the integer for a service plugin will turn out to be the 
same after AM restart?

Why is yarn scheduler special cased? Launcher/Communicator dont have the 
special casing ?
{code}
+  static void processSchedulerDescriptors(List<NamedEntityDescriptor> 
descriptors, boolean isLocal,
+                                          UserPayload defaultPayload,
+                                          BiMap<String, Integer> 
schedulerPluginMap) {
.....
+      if (!foundYarn) {
+        NamedEntityDescriptor yarnDescriptor =
+            new 
NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null)
+                .setUserPayload(defaultPayload);
+        addDescriptor(descriptors, schedulerPluginMap, yarnDescriptor);
+      }{code}

Why use different code path for uber/default. They should just work when 
instantiated the same way as a custom plugin.
{code}
+  TaskCommunicator createTaskCommunicator(NamedEntityDescriptor 
taskCommDescriptor,
+                                          int taskCommIndex) {
+    if 
(taskCommDescriptor.getEntityName().equals(TezConstants.getTezYarnServicePluginName()))
 {
+      return 
createDefaultTaskCommunicator(taskCommunicatorContexts[taskCommIndex]);
+    } else if (taskCommDescriptor.getEntityName()
+        .equals(TezConstants.getTezUberServicePluginName())) {
+      return 
createUberTaskCommunicator(taskCommunicatorContexts[taskCommIndex]);
+    } else {
+      return 
createCustomTaskCommunicator(taskCommunicatorContexts[taskCommIndex],
+          taskCommDescriptor);
     }{code}

Are this and other methods threadsafe wrt callback from multiple plugins?
{code}
+  public TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request)
+      throws IOException, TezException {
{code}
Also in heartbeat(), the following code has been lost during merge.
{code}        for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
          // for now, set the event time on the AM when it is received.
          // this avoids any time disparity between machines.
          tezEvent.setEventReceivedTime(currTime);{code}

Why are the contextImpls not directing invoking/handling the plugins instead of 
going through the router?
{code} 
+  public void vertexStateUpdateNotificationReceived(VertexStateUpdate event, 
int taskCommIndex) throws
+      Exception {
+    taskCommunicators[taskCommIndex].onVertexStateUpdated(event);
+  }{code}

Why this 3 step process?
{code}
+    // Inform all communicators of the dagCompletion.
+    for (int i = 0 ; i < taskCommunicators.length ; i++) {
+      
((TaskCommunicatorContextImpl)taskCommunicatorContexts[i]).dagCompleteStart(dag);
+      taskCommunicators[i].dagComplete(dag.getName());
+      
((TaskCommunicatorContextImpl)taskCommunicatorContexts[i]).dagCompleteEnd();
+    }{code}

Why has the synchronization been removed. I remember this being a subtle race 
condition.
{code}
   @Override
   public void registerTaskAttempt(AMContainerTask amContainerTask,
-      ContainerId containerId) {
+                                  ContainerId containerId, int taskCommId) {
     ContainerInfo containerInfo = registeredContainers.get(containerId);
-    if(containerInfo == null) {
+    if (containerInfo == null) {
       throw new TezUncheckedException("Registering task attempt: "
           + amContainerTask.getTask().getTaskAttemptID() + " to unknown 
container: " + containerId);
     }
-    synchronized (containerInfo) {
-      if(containerInfo.amContainerTask != null) {
-        throw new TezUncheckedException("Registering task attempt: "
-            + amContainerTask.getTask().getTaskAttemptID() + " to container: " 
+ containerId
-            + " with existing assignment to: " + 
containerInfo.amContainerTask.getTask().getTaskAttemptID());
-      }
-      containerInfo.amContainerTask = amContainerTask;
-      containerInfo.taskPulled = false;
-
-      ContainerId containerIdFromMap =
-          attemptToInfoMap.put(amContainerTask.getTask().getTaskAttemptID(), 
containerId);
-      if(containerIdFromMap != null) {
-        throw new TezUncheckedException("Registering task attempt: "
-            + amContainerTask.getTask().getTaskAttemptID() + " to container: " 
+ containerId
-            + " when already assigned to: " + containerIdFromMap);
-      }
+    if (containerInfo.taskAttemptId != null) {
+      throw new TezUncheckedException("Registering task attempt: "
+          + amContainerTask.getTask().getTaskAttemptID() + " to container: " + 
containerId
+          + " with existing assignment to: " +
+          containerInfo.taskAttemptId);
     }
-  }{code}

The dagCompleteStart/End logic is either broken or unnecessary because the 
correct dag seems to be always received from appContext.getCurrentDAG().
{code}
+  private DAG getDag() {
+    dagChangedReadLock.lock();
+    try {
+      if (dag != null) {
+        return dag;
+      } else {
+        return context.getCurrentDAG();
+      }
+    } finally {
+      dagChangedReadLock.unlock();
+    }
+  }
+
+  @InterfaceAudience.Private
+  public void dagCompleteStart(DAG dag) {
+    dagChangedWriteLock.lock();
+    try {
+      this.dag = dag;
+    } finally {
+      dagChangedWriteLock.unlock();
+    }
+  }
+
+  public void dagCompleteEnd() {
+    dagChangedWriteLock.lock();
+    try {
+      this.dag = null;
+    } finally {
+      dagChangedWriteLock.unlock();
+    }
+  }
+}{code}

Why not keep a cached copy instead of converting each time?
{code}
+  public org.apache.tez.dag.api.Vertex.VertexExecutionContext 
getDefaultExecutionContext() {
+    if (jobPlan.hasDefaultExecutionContext()) {
+      return 
DagTypeConverters.convertFromProto(jobPlan.getDefaultExecutionContext());
+    } else {
+      return null;
+    }
+  }
+{code}

There is a scheduledTime on master that this is duplicating.
{code}
+  private long scheduleTime = 0;
   private long finishTime = 0;
   private String trackerName;
   private int httpPort;{code}

What is the use of this? 
{code}
+  @Override
+  public long getFirstAttemptStartTime() {
+    readLock.lock();
+    try {
+      // The first attempt will always have an index of 0.
+      return getAttempt(TezTaskAttemptID.getInstance(getTaskId(), 
0)).getScheduleTime();
+    } finally {
+      readLock.unlock();
+    }
+  }
+{code}

Why is the vertex code concerned with checking local mode/uber etc.? It should 
simply take the vertex or default
execution context and use it.
{code}
+    boolean isLocal = vertexConf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE,
+        TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
+
+    String tezDefaultComponentName =
+        isLocal ? TezConstants.getTezUberServicePluginName() :
+            TezConstants.getTezYarnServicePluginName();
+
+    org.apache.tez.dag.api.Vertex.VertexExecutionContext execContext = 
dag.getDefaultExecutionContext();
+    if (vertexPlan.hasExecutionContext()) {
+      execContext = 
DagTypeConverters.convertFromProto(vertexPlan.getExecutionContext());
+      LOG.info("Using ExecutionContext from Vertex for Vertex {}", vertexName);
+    } else if (execContext != null) {
+      LOG.info("Using ExecutionContext from DAG for Vertex {}", vertexName);
+    }
+    if (execContext != null) {
+      if (execContext.shouldExecuteInAm()) {
+        tezDefaultComponentName = TezConstants.getTezUberServicePluginName();
+      }
+    }
+
+    String taskSchedulerName = tezDefaultComponentName;
+    String containerLauncherName = tezDefaultComponentName;
+    String taskCommName = tezDefaultComponentName;
+
+    if (execContext != null) {
+      if (execContext.getTaskSchedulerName() != null) {
+        taskSchedulerName = execContext.getTaskSchedulerName();
+      }
+      if (execContext.getContainerLauncherName() != null) {
+        containerLauncherName = execContext.getContainerLauncherName();
+      }
+      if (execContext.getTaskCommName() != null) {
+        taskCommName = execContext.getTaskCommName();
+      }
+    }
{code}

Create base class that takes care of the repeated schedulerId code in all 
scheduler events?
{code}
public class AMSchedulerEventDeallocateContainer extends AMSchedulerEvent {
 
   private final ContainerId containerId;
+  private final int schedulerId;
{code}

Why is the end reason here. Dont see any use anywhere in the patch.
{code}  
   @Override
-  public boolean deallocateTask(Object task, boolean taskSucceeded) {
+  public boolean deallocateTask(Object task, boolean taskSucceeded, 
TaskAttemptEndReason endReason) {
     return taskRequestHandler.addDeallocateTaskRequest(task);
   }
{code}

Why are uber/container handled differently
{code}
+  @VisibleForTesting
+  TaskScheduler createTaskScheduler(String host, int port, String trackingUrl,
+                                                   AppContext appContext,
+                                                   NamedEntityDescriptor 
taskSchedulerDescriptor,
+                                                   long customAppIdIdentifier,
+                                                   int schedulerId) {
+    TaskSchedulerContext rawContext =
+        new TaskSchedulerContextImpl(this, appContext, schedulerId, 
trackingUrl,
+            customAppIdIdentifier, host, port, 
taskSchedulerDescriptor.getUserPayload());
+    TaskSchedulerContext wrappedContext = wrapTaskSchedulerContext(rawContext);
+    String schedulerName = taskSchedulerDescriptor.getEntityName();
+    if (schedulerName.equals(TezConstants.getTezYarnServicePluginName())) {
+      return createYarnTaskScheduler(wrappedContext, schedulerId);
+    } else if 
(schedulerName.equals(TezConstants.getTezUberServicePluginName())) {
+      return createUberTaskScheduler(wrappedContext, schedulerId);
+    } else {
+      return createCustomTaskScheduler(wrappedContext, 
taskSchedulerDescriptor, schedulerId);
     }
-    else {
-      return new YarnTaskSchedulerService(this, this.containerSignatureMatcher,
-          host, port, trackingUrl, appContext);
+  }
{code}

Have a base class that prevents code duplication for launcherId etc. for all 
AMContainerEvents
{code}
   public AMContainerEventLaunchRequest(ContainerId containerId,
-      TezVertexID vertexId, ContainerContext containerContext) {
+      TezVertexID vertexId, ContainerContext containerContext,
+      int launcherId, int taskCommId) {
     super(containerId, AMContainerEventType.C_LAUNCH_REQUEST);
     this.vertexId = vertexId;
     this.containerContext = containerContext;
+    this.launcherId = launcherId;
+    this.taskCommId = taskCommId;
   }{code}

Create an object to represent service plugins instead of having 
sched/launcher/comm everywhere. This will make the code cleaner
and easier to maintain. Any changes like have helper methods or new plugins 
will be easier to make.
{code}
@@ -80,6 +84,9 @@ public class AMContainerImpl implements AMContainer {
   private final TaskAttemptListener taskAttemptListener;
   protected final EventHandler eventHandler;
   private final ContainerSignatureMatcher signatureMatcher;
+  private final int schedulerId;
+  private final int launcherId;
+  private final int taskCommId;
{code} 

AMContainer end reasons need a detailed look.

Then lets call it schedulerId unless there is reason for it to be something else
{code}
+  private final int sourceId; // Effectively the schedulerId
 
-  public AMNodeEvent(NodeId nodeId, AMNodeEventType type) {
+  public AMNodeEvent(NodeId nodeId, int sourceId, AMNodeEventType type) {
{code}

The pending changes to AMNode (in general node handling per scheduler) really 
need to be prioritized. They look quite incomplete because nodes are 
closely tied to execution environment and the current state of the code with 
some bits handled and some bits not, can be error prone.
{code}
@@ -247,7 +249,7 @@ public class AMNodeImpl implements AMNode {
 
   /* Blacklist the node with the AMNodeTracker and check if the node should be 
blacklisted */
   protected boolean registerBadNodeAndShouldBlacklist() {
-    return appContext.getNodeTracker().registerBadNodeAndShouldBlacklist(this);
+    return appContext.getNodeTracker().registerBadNodeAndShouldBlacklist(this, 
sourceId);
   }
@@ -257,7 +259,8 @@ public class AMNodeImpl implements AMNode {
-    sendEvent(new AMSchedulerEventNodeBlacklistUpdate(getNodeId(), true));
+    // TODO TEZ-2124 node tracking per ext source
+    sendEvent(new AMSchedulerEventNodeBlacklistUpdate(getNodeId(), true, 0));
@@ -363,7 +366,8 @@ public class AMNodeImpl implements AMNode {
-        node.sendEvent(new 
AMSchedulerEventNodeBlacklistUpdate(node.getNodeId(), false));
+        // TODO TEZ-2124 node tracking per ext source
+        node.sendEvent(new 
AMSchedulerEventNodeBlacklistUpdate(node.getNodeId(), false, 0));
{code}

What is a source? Can we call it scheduler or execution environment?
{code}   static final Logger LOG = LoggerFactory.getLogger(AMNodeTracker.class);
   
-  private final ConcurrentHashMap<NodeId, AMNode> nodeMap;
-  private final ConcurrentHashMap<String, Set<NodeId>> blacklistMap;
+  private final ConcurrentMap<Integer, PerSourceNodeTracker> 
perSourceNodeTrackers;
+{code}

Remove commented code
{code}
+                  MockDAGAppMaster.this.getContext().getClock().getTime()));
+//              TezHeartbeatRequest request = new 
TezHeartbeatRequest(cData.numUpdates, events,
+//                  cData.cIdStr, cData.taId, cData.nextFromEventId, 50000);
+              TaskHeartbeatRequest request =
+                  new TaskHeartbeatRequest(cData.cIdStr, cData.taId, events, 
cData.nextFromEventId, cData.nextPreRoutedFromEventId,
+                      50000);{code}

New code misses the ++cData.numUpdates
{code}
-                  getContext().getClock().getTime()));
-              TezHeartbeatRequest request = new 
TezHeartbeatRequest(++cData.numUpdates, events,
-                  cData.nextPreRoutedFromEventId, cData.cIdStr, cData.taId, 
cData.nextFromEventId, 10000);
+                  MockDAGAppMaster.this.getContext().getClock().getTime()));
+              TaskHeartbeatRequest request =
+                  new TaskHeartbeatRequest(cData.cIdStr, cData.taId, events, 
cData.nextFromEventId, cData.nextPreRoutedFromEventId,
+                      10000);{code}

Why do we need the new session token related code here?
{code}-  public void testPortRange_NotSpecified() {
+  public void testPortRange_NotSpecified() throws IOException {
     Configuration conf = new Configuration();
+    JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(
+        "fakeIdentifier"));
+    Token<JobTokenIdentifier> sessionToken = new 
Token<JobTokenIdentifier>(identifier,
+        new JobTokenSecretManager());
+    sessionToken.setService(identifier.getJobId());
+    TokenCache.setSessionToken(sessionToken, credentials);
+    UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
     taskAttemptListener = new TaskAttemptListenerImpTezDag(appContext,
-        mock(TaskHeartbeatHandler.class), 
mock(ContainerHeartbeatHandler.class), null);
+        mock(TaskHeartbeatHandler.class), 
mock(ContainerHeartbeatHandler.class), Lists.newArrayList(
+        new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), 
null)
+            .setUserPayload(userPayload)));{code}

Create shared method instead of duplicating code?
{code}@@ -285,8 +287,9 @@ public class TestTaskAttempt {
-    when(taListener.getAddress()).thenReturn(
-        new InetSocketAddress("localhost", 0));
+    TaskCommunicator taskComm = mock(TaskCommunicator.class);
+    doReturn(new InetSocketAddress("localhost", 
0)).when(taskComm).getAddress();
+    doReturn(taskComm).when(taListener).getTaskCommunicator(0);
@@ -335,8 +338,9 @@ public class TestTaskAttempt {
-    when(taListener.getAddress()).thenReturn(
-        new InetSocketAddress("localhost", 0));
+    TaskCommunicator taskComm = mock(TaskCommunicator.class);
+    doReturn(new InetSocketAddress("localhost", 
0)).when(taskComm).getAddress();
+    doReturn(taskComm).when(taListener).getTaskCommunicator(0);
@@ -436,8 +440,9 @@ public class TestTaskAttempt {
-    when(taListener.getAddress()).thenReturn(
-        new InetSocketAddress("localhost", 0));
+    TaskCommunicator taskComm = mock(TaskCommunicator.class);
+    doReturn(new InetSocketAddress("localhost", 
0)).when(taskComm).getAddress();
+    doReturn(taskComm).when(taListener).getTaskCommunicator(0);
{code}

This seems to be duplicating the logic in the DAGAppMaster. That would be 
fragile and tests would not break if a future change
in the App master actually broke this logic.
{code}
+  private static class ExecutionContextTestInfoHolder {
+
+    static final String TASK_SCHEDULER_NAME_BASE = "TASK_SCHEDULER";
+    static final String CONTAINER_LAUNCHER_NAME_BASE = "CONTAINER_LAUNCHER";
+    static final String TASK_COMM_NAME_BASE = "TASK_COMMUNICATOR";
+
+    static String append(String base, int index) {
+      return base + index;
+    }
+
+    final String vertexName;
+    final VertexExecutionContext defaultExecutionContext;
+    final VertexExecutionContext vertexExecutionContext;
+    final BiMap<String, Integer> taskSchedulers = HashBiMap.create();
+    final BiMap<String, Integer> containerLaunchers = HashBiMap.create();
+    final BiMap<String, Integer> taskComms = HashBiMap.create();
+    final AppContext appContext;
+{code}

Why is this mock return of RUNNING state missing. Allocated containers checks 
for running state. Is that handled by the new code in 
TestTaskSchedulerHelpers?
{code}
   @SuppressWarnings({ "unchecked" })
   @Test(timeout=10000)
   public void testTaskSchedulerWithReuse() throws Exception {
     RackResolver.init(new YarnConfiguration());
-    TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
-    AppContext mockAppContext = mock(AppContext.class);
-    when(mockAppContext.getAMState()).thenReturn(DAGAppMasterState.RUNNING);
 {code}


> [Umbrella] Allow Tez to co-ordinate execution to external services
> ------------------------------------------------------------------
>
>                 Key: TEZ-2003
>                 URL: https://issues.apache.org/jira/browse/TEZ-2003
>             Project: Apache Tez
>          Issue Type: Improvement
>            Reporter: Siddharth Seth
>         Attachments: 2003_20150728.1.txt, 2003_20150807.1.txt, 
> 2003_20150807.2.txt, Tez With External Services.pdf
>
>
> The Tez engine itself takes care of co-ordinating execution - controlling how 
> data gets routed (different connection patterns), fault tolerance, scheduling 
> of work, etc.
> This is currently tied to TaskSpecs defined within Tez and on containers 
> launched by Tez itself (TezChild).
> The proposal is to allow Tez to work with external services instead of just 
> containers launched by Tez. This involves several more pluggable layers to 
> work with alternate Task Specifications, custom launch and task allocation 
> mechanics, as well as custom scheduling sources.
> A simple example would be a simple a process with the capability to execute 
> multiple Tez TaskSpecs as threads. In such a case, a container launch isn't 
> really need and can be mocked. Sourcing / scheduling containers would need to 
> be pluggable.
> A more advanced example would be LLAP (HIVE-7926; 
> https://issues.apache.org/jira/secure/attachment/12665704/LLAPdesigndocument.pdf).
> This works with custom interfaces - which would need to be supported by Tez, 
> along with a custom event model which would need translation hooks.
> Tez should be able to work with a combination of certain vertices running in 
> external services and others running in regular Tez containers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to