TEZ-2651. Pluggable services should not extend AbstractService. (sseth)

Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/fc8a4ce5
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/fc8a4ce5
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/fc8a4ce5

Branch: refs/heads/TEZ-2003
Commit: fc8a4ce598d52496c19ec2508e4ebed7ef22eb63
Parents: 82c24ac
Author: Siddharth Seth <[email protected]>
Authored: Tue Jul 28 14:55:40 2015 -0700
Committer: Siddharth Seth <[email protected]>
Committed: Fri Aug 14 13:46:45 2015 -0700

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                            |  1 +
 .../serviceplugins/api/ContainerLauncher.java   | 18 ++++++++++--
 .../apache/tez/dag/api/TaskCommunicator.java    | 30 +++++++++++++++++---
 .../tez/dag/api/TaskCommunicatorContext.java    |  5 ++++
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  4 +--
 .../dag/app/TaskAttemptListenerImpTezDag.java   | 16 ++++++-----
 .../dag/app/TaskCommunicatorContextImpl.java    |  9 ++++++
 .../tez/dag/app/TezTaskCommunicatorImpl.java    | 24 ++++++----------
 .../dag/app/launcher/ContainerLauncherImpl.java |  6 ++--
 .../app/launcher/ContainerLauncherRouter.java   | 12 ++++++--
 .../app/launcher/LocalContainerLauncher.java    |  6 ++--
 .../apache/tez/dag/app/MockDAGAppMaster.java    |  6 ++--
 .../app/TestTaskAttemptListenerImplTezDag.java  |  8 +++---
 .../TezTestServiceContainerLauncher.java        |  6 ++--
 .../TezTestServiceNoOpContainerLauncher.java    |  2 +-
 .../TezTestServiceTaskCommunicatorImpl.java     | 29 +++++++++----------
 16 files changed, 116 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/fc8a4ce5/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index a51669d..e57f76f 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -36,5 +36,6 @@ ALL CHANGES:
   TEZ-2124. Change Node tracking to work per external container source.
   TEZ-2004. Define basic interface for pluggable ContainerLaunchers.
   TEZ-2005. Define basic interface for pluggable TaskScheduler.
+  TEZ-2651. Pluggable services should not extend AbstractService.
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/fc8a4ce5/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java
----------------------------------------------------------------------
diff --git 
a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java
 
b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java
index 218edb6..8337dcb 100644
--- 
a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java
+++ 
b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java
@@ -17,6 +17,7 @@ package org.apache.tez.serviceplugins.api;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.tez.common.ServicePluginLifecycle;
 
 /**
  * Plugin to allow custom container launchers to be written to launch 
containers on different types
@@ -25,18 +26,29 @@ import org.apache.hadoop.service.AbstractService;
 
 @InterfaceAudience.Public
 @InterfaceStability.Unstable
-public abstract class ContainerLauncher extends AbstractService {
+public abstract class ContainerLauncher implements ServicePluginLifecycle {
 
   private final ContainerLauncherContext containerLauncherContext;
 
   // TODO TEZ-2003 Simplify this by moving away from AbstractService. 
Potentially Guava AbstractService.
   // A serviceInit(Configuration) is not likely to be very useful, and will 
expose unnecessary internal
   // configuration to the services if populated with the AM Configuration
-  public ContainerLauncher(String name, ContainerLauncherContext 
containerLauncherContext) {
-    super(name);
+  public ContainerLauncher(ContainerLauncherContext containerLauncherContext) {
     this.containerLauncherContext = containerLauncherContext;
   }
 
+  @Override
+  public void initialize() throws Exception {
+  }
+
+  @Override
+  public void start() throws Exception {
+  }
+
+  @Override
+  public void shutdown() throws Exception {
+  }
+
   public final ContainerLauncherContext getContext() {
     return this.containerLauncherContext;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/fc8a4ce5/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java 
b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
index 05e437c..f221414 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
@@ -18,9 +18,9 @@ import java.net.InetSocketAddress;
 import java.util.Map;
 
 import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.common.ServicePluginLifecycle;
 import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.serviceplugins.api.ContainerEndReason;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
@@ -28,11 +28,33 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 
 // TODO TEZ-2003 Move this into the tez-api module
-public abstract class TaskCommunicator extends AbstractService {
-  public TaskCommunicator(String name) {
-    super(name);
+public abstract class TaskCommunicator implements ServicePluginLifecycle {
+
+  private final TaskCommunicatorContext taskCommunicatorContext;
+
+  public TaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) {
+    this.taskCommunicatorContext = taskCommunicatorContext;
+  }
+
+  public TaskCommunicatorContext getContext() {
+    return taskCommunicatorContext;
+  }
+
+  @Override
+  public void initialize() throws Exception {
   }
 
+  @Override
+  public void start() throws Exception {
+  }
+
+  @Override
+  public void shutdown() throws Exception {
+  }
+
+  // TODO Post TEZ-2003 Move this into the API module. Moving this requires 
abstractions for
+  // TaskSpec and related classes. (assuming that's efficient for execution)
+
   // TODO TEZ-2003 Ideally, don't expose YARN containerId; instead expose a 
Tez specific construct.
   // TODO When talking to an external service, this plugin implementer may 
need access to a host:port
   public abstract void registerRunningContainer(ContainerId containerId, 
String hostname, int port);

http://git-wip-us.apache.org/repos/asf/tez/blob/fc8a4ce5/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java 
b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
index b6e63f7..ab32ec1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
@@ -18,6 +18,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.Set;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -35,6 +36,9 @@ public interface TaskCommunicatorContext {
 
   // TODO TEZ-2003 Maybe add book-keeping as a helper library, instead of each 
impl tracking container to task etc.
 
+  // TODO TEZ-2003 To be replaced by getInitialPayload
+  Configuration getInitialConfiguration();
+
   ApplicationAttemptId getApplicationAttemptId();
   Credentials getCredentials();
 
@@ -42,6 +46,7 @@ public interface TaskCommunicatorContext {
   boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException;
 
   // TODO TEZ-2003 Split the heartbeat API to a liveness check and a status 
update
+  // KKK Rename this API
   TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request) throws 
IOException, TezException;
 
   boolean isKnownContainer(ContainerId containerId);

http://git-wip-us.apache.org/repos/asf/tez/blob/fc8a4ce5/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index ef27ddf..f3914d8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -1047,8 +1047,8 @@ public class DAGAppMaster extends AbstractService {
                                                           String[] 
taskCommunicatorClasses,
                                                           boolean isLocal) {
     TaskAttemptListener lis =
-        new TaskAttemptListenerImpTezDag(context, thh, chh, 
jobTokenSecretManager,
-            taskCommunicatorClasses, isLocal);
+        new TaskAttemptListenerImpTezDag(context, thh, chh,
+            taskCommunicatorClasses, amConf, isLocal);
     return lis;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/fc8a4ce5/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 47b63dd..599c208 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentMap;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.collections4.ListUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.serviceplugins.api.ContainerEndReason;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
@@ -61,7 +62,6 @@ import org.apache.tez.dag.app.rm.container.AMContainerTask;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.runtime.api.impl.TezEvent;
-import org.apache.tez.common.security.JobTokenSecretManager;
 
 
 @SuppressWarnings("unchecked")
@@ -75,6 +75,7 @@ public class TaskAttemptListenerImpTezDag extends 
AbstractService implements
   private final AppContext context;
   private final TaskCommunicator[] taskCommunicators;
   private final TaskCommunicatorContext[] taskCommunicatorContexts;
+  protected final ServicePluginLifecycleAbstractService 
[]taskCommunicatorServiceWrappers;
 
   protected final TaskHeartbeatHandler taskHeartbeatHandler;
   protected final ContainerHeartbeatHandler containerHeartbeatHandler;
@@ -99,9 +100,8 @@ public class TaskAttemptListenerImpTezDag extends 
AbstractService implements
 
   public TaskAttemptListenerImpTezDag(AppContext context,
                                       TaskHeartbeatHandler thh, 
ContainerHeartbeatHandler chh,
-                                      // TODO TEZ-2003 pre-merge. Remove 
reference to JobTokenSecretManager.
-                                      JobTokenSecretManager 
jobTokenSecretManager,
                                       String [] 
taskCommunicatorClassIdentifiers,
+                                      Configuration conf,
                                       boolean isPureLocalMode) {
     super(TaskAttemptListenerImpTezDag.class.getName());
     this.context = context;
@@ -118,9 +118,11 @@ public class TaskAttemptListenerImpTezDag extends 
AbstractService implements
     }
     this.taskCommunicators = new 
TaskCommunicator[taskCommunicatorClassIdentifiers.length];
     this.taskCommunicatorContexts = new 
TaskCommunicatorContext[taskCommunicatorClassIdentifiers.length];
+    this.taskCommunicatorServiceWrappers = new 
ServicePluginLifecycleAbstractService[taskCommunicatorClassIdentifiers.length];
     for (int i = 0 ; i < taskCommunicatorClassIdentifiers.length ; i++) {
-      taskCommunicatorContexts[i] = new TaskCommunicatorContextImpl(context, 
this, i);
+      taskCommunicatorContexts[i] = new TaskCommunicatorContextImpl(context, 
this, conf, i);
       taskCommunicators[i] = 
createTaskCommunicator(taskCommunicatorClassIdentifiers[i], i);
+      taskCommunicatorServiceWrappers[i] = new 
ServicePluginLifecycleAbstractService(taskCommunicators[i]);
     }
     // TODO TEZ-2118 Start using taskCommunicator indices properly
   }
@@ -129,15 +131,15 @@ public class TaskAttemptListenerImpTezDag extends 
AbstractService implements
   public void serviceStart() {
     // TODO Why is init tied to serviceStart
     for (int i = 0 ; i < taskCommunicators.length ; i++) {
-      taskCommunicators[i].init(getConfig());
-      taskCommunicators[i].start();
+      taskCommunicatorServiceWrappers[i].init(getConfig());
+      taskCommunicatorServiceWrappers[i].start();
     }
   }
 
   @Override
   public void serviceStop() {
     for (int i = 0 ; i < taskCommunicators.length ; i++) {
-      taskCommunicators[i].stop();
+      taskCommunicatorServiceWrappers[i].stop();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/fc8a4ce5/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
index 50e006d..035db93 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
@@ -23,6 +23,7 @@ import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -48,14 +49,17 @@ public class TaskCommunicatorContextImpl implements 
TaskCommunicatorContext, Ver
   private final int taskCommunicatorIndex;
   private final ReentrantReadWriteLock.ReadLock dagChangedReadLock;
   private final ReentrantReadWriteLock.WriteLock dagChangedWriteLock;
+  private final Configuration conf;
 
   private DAG dag;
 
   public TaskCommunicatorContextImpl(AppContext appContext,
                                      TaskAttemptListenerImpTezDag 
taskAttemptListener,
+                                     Configuration conf,
                                      int taskCommunicatorIndex) {
     this.context = appContext;
     this.taskAttemptListener = taskAttemptListener;
+    this.conf = conf;
     this.taskCommunicatorIndex = taskCommunicatorIndex;
 
     ReentrantReadWriteLock dagChangedLock = new ReentrantReadWriteLock();
@@ -64,6 +68,11 @@ public class TaskCommunicatorContextImpl implements 
TaskCommunicatorContext, Ver
   }
 
   @Override
+  public Configuration getInitialConfiguration() {
+    return conf;
+  }
+
+  @Override
   public ApplicationAttemptId getApplicationAttemptId() {
     return context.getApplicationAttemptId();
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/fc8a4ce5/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
index 0374022..93b5b43 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
@@ -67,7 +67,6 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator 
{
   private static final ContainerTask TASK_FOR_INVALID_JVM = new ContainerTask(
       null, true, null, null, false);
 
-  private final TaskCommunicatorContext taskCommunicatorContext;
   private final TezTaskUmbilicalProtocol taskUmbilical;
 
   protected final ConcurrentMap<ContainerId, ContainerInfo> 
registeredContainers =
@@ -116,25 +115,24 @@ public class TezTaskCommunicatorImpl extends 
TaskCommunicator {
    * Construct the service.
    */
   public TezTaskCommunicatorImpl(TaskCommunicatorContext 
taskCommunicatorContext) {
-    super(TezTaskCommunicatorImpl.class.getName());
-    this.taskCommunicatorContext = taskCommunicatorContext;
+    super(taskCommunicatorContext);
     this.taskUmbilical = new TezTaskUmbilicalProtocolImpl();
-    this.tokenIdentifier = 
this.taskCommunicatorContext.getApplicationAttemptId().getApplicationId().toString();
+    this.tokenIdentifier = 
taskCommunicatorContext.getApplicationAttemptId().getApplicationId().toString();
     this.sessionToken = 
TokenCache.getSessionToken(taskCommunicatorContext.getCredentials());
   }
 
   @Override
-  public void serviceStart() {
+  public void start() {
     startRpcServer();
   }
 
   @Override
-  public void serviceStop() {
+  public void shutdown() {
     stopRpcServer();
   }
 
   protected void startRpcServer() {
-    Configuration conf = getConfig();
+    Configuration conf = getContext().getInitialConfiguration();
     try {
       JobTokenSecretManager jobTokenSecretManager =
           new JobTokenSecretManager();
@@ -281,10 +279,6 @@ public class TezTaskCommunicatorImpl extends 
TaskCommunicator {
     return sessionToken;
   }
 
-  protected TaskCommunicatorContext getTaskCommunicatorContext() {
-    return taskCommunicatorContext;
-  }
-
   public TezTaskUmbilicalProtocol getUmbilical() {
     return this.taskUmbilical;
   }
@@ -305,7 +299,7 @@ public class TezTaskCommunicatorImpl extends 
TaskCommunicator {
         }
         task = getContainerTask(containerId);
         if (task != null && !task.shouldDie()) {
-          taskCommunicatorContext
+          getContext()
               .taskStartedRemotely(task.getTaskSpec().getTaskAttemptID(), 
containerId);
         }
       }
@@ -317,7 +311,7 @@ public class TezTaskCommunicatorImpl extends 
TaskCommunicator {
 
     @Override
     public boolean canCommit(TezTaskAttemptID taskAttemptId) throws 
IOException {
-      return taskCommunicatorContext.canCommit(taskAttemptId);
+      return getContext().canCommit(taskAttemptId);
     }
 
     @Override
@@ -370,7 +364,7 @@ public class TezTaskCommunicatorImpl extends 
TaskCommunicator {
         TaskHeartbeatRequest tRequest = new 
TaskHeartbeatRequest(request.getContainerIdentifier(),
             request.getCurrentTaskAttemptID(), request.getEvents(), 
request.getStartIndex(),
             request.getPreRoutedStartIndex(), request.getMaxEvents());
-        tResponse = taskCommunicatorContext.heartbeat(tRequest);
+        tResponse = getContext().heartbeat(tRequest);
       }
       TezHeartbeatResponse response = new TezHeartbeatResponse();
       response.setLastRequestId(requestId);
@@ -402,7 +396,7 @@ public class TezTaskCommunicatorImpl extends 
TaskCommunicator {
     ContainerInfo containerInfo = registeredContainers.get(containerId);
     ContainerTask task = null;
     if (containerInfo == null) {
-      if (taskCommunicatorContext.isKnownContainer(containerId)) {
+      if (getContext().isKnownContainer(containerId)) {
         LOG.info("Container with id: " + containerId
             + " is valid, but no longer registered, and will be killed");
       } else {

http://git-wip-us.apache.org/repos/asf/tez/blob/fc8a4ce5/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
index fe0178c..34c7bc0 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
@@ -223,7 +223,7 @@ public class ContainerLauncherImpl extends 
ContainerLauncher {
   }
 
   public ContainerLauncherImpl(ContainerLauncherContext 
containerLauncherContext) {
-    super(ContainerLauncherImpl.class.getName(), containerLauncherContext);
+    super(containerLauncherContext);
     this.conf = new 
Configuration(containerLauncherContext.getInitialConfiguration());
     conf.setInt(
         CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
@@ -235,7 +235,7 @@ public class ContainerLauncherImpl extends 
ContainerLauncher {
   }
 
   @Override
-  public void serviceStart() {
+  public void start() {
     // pass a copy of config to ContainerManagementProtocolProxy until 
YARN-3497 is fixed
     cmProxy =
         new ContainerManagementProtocolProxy(conf);
@@ -307,7 +307,7 @@ public class ContainerLauncherImpl extends 
ContainerLauncher {
   }
 
   @Override
-  public void serviceStop() {
+  public void shutdown() {
     if(!serviceStopped.compareAndSet(false, true)) {
       LOG.info("Ignoring multiple stops");
       return;

http://git-wip-us.apache.org/repos/asf/tez/blob/fc8a4ce5/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
index 9f741cf..7c6a6a4 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
 import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
 import org.apache.tez.serviceplugins.api.ContainerLauncher;
 import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
@@ -45,6 +46,7 @@ public class ContainerLauncherRouter extends AbstractService
 
   private final ContainerLauncher containerLaunchers[];
   private final ContainerLauncherContext containerLauncherContexts[];
+  protected final ServicePluginLifecycleAbstractService[] 
containerLauncherServiceWrappers;
   private final AppContext appContext;
 
   @VisibleForTesting
@@ -53,6 +55,8 @@ public class ContainerLauncherRouter extends AbstractService
     this.appContext = context;
     containerLaunchers = new ContainerLauncher[] {containerLauncher};
     containerLauncherContexts = new ContainerLauncherContext[] 
{containerLauncher.getContext()};
+    containerLauncherServiceWrappers = new 
ServicePluginLifecycleAbstractService[]{
+        new ServicePluginLifecycleAbstractService(containerLauncher)};
   }
 
   // Accepting conf to setup final parameters, if required.
@@ -75,6 +79,7 @@ public class ContainerLauncherRouter extends AbstractService
     }
     containerLauncherContexts = new 
ContainerLauncherContext[containerLauncherClassIdentifiers.length];
     containerLaunchers = new 
ContainerLauncher[containerLauncherClassIdentifiers.length];
+    containerLauncherServiceWrappers = new 
ServicePluginLifecycleAbstractService[containerLauncherClassIdentifiers.length];
 
 
     for (int i = 0; i < containerLauncherClassIdentifiers.length; i++) {
@@ -82,6 +87,7 @@ public class ContainerLauncherRouter extends AbstractService
       containerLauncherContexts[i] = containerLauncherContext;
       containerLaunchers[i] = 
createContainerLauncher(containerLauncherClassIdentifiers[i], context,
           containerLauncherContext, taskAttemptListener, workingDirectory, 
isPureLocalMode, conf);
+      containerLauncherServiceWrappers[i] = new 
ServicePluginLifecycleAbstractService(containerLaunchers[i]);
     }
   }
 
@@ -130,21 +136,21 @@ public class ContainerLauncherRouter extends 
AbstractService
   @Override
   public void serviceInit(Configuration conf) {
     for (int i = 0 ; i < containerLaunchers.length ; i++) {
-      ((AbstractService) containerLaunchers[i]).init(conf);
+      containerLauncherServiceWrappers[i].init(conf);
     }
   }
 
   @Override
   public void serviceStart() {
     for (int i = 0 ; i < containerLaunchers.length ; i++) {
-      ((AbstractService) containerLaunchers[i]).start();
+      containerLauncherServiceWrappers[i].start();
     }
   }
 
   @Override
   public void serviceStop() {
     for (int i = 0 ; i < containerLaunchers.length ; i++) {
-      ((AbstractService) containerLaunchers[i]).stop();
+      containerLauncherServiceWrappers[i].stop();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/fc8a4ce5/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index a1b8e29..3975111 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -111,7 +111,7 @@ public class LocalContainerLauncher extends 
ContainerLauncher {
     // starts up. It's not possible to set these up via a static payload.
     // Will need some kind of mechanism to dynamically crate payloads / bind 
to parameters
     // after the AM starts up.
-    super(LocalContainerLauncher.class.getName(), containerLauncherContext);
+    super(containerLauncherContext);
     this.context = context;
     this.tal = taskAttemptListener;
     this.workingDirectory = workingDirectory;
@@ -139,14 +139,14 @@ public class LocalContainerLauncher extends 
ContainerLauncher {
   }
 
   @Override
-  public void serviceStart() throws Exception {
+  public void start() throws Exception {
     eventHandlingThread =
         new Thread(new TezSubTaskRunner(), 
"LocalContainerLauncher-SubTaskRunner");
     eventHandlingThread.start();
   }
 
   @Override
-  public void serviceStop() throws Exception {
+  public void shutdown() throws Exception {
     if (!serviceStopped.compareAndSet(false, true)) {
       LOG.info("Service Already stopped. Ignoring additional stop");
       return;

http://git-wip-us.apache.org/repos/asf/tez/blob/fc8a4ce5/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index 3c3c6a7..21ae5f7 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -139,7 +139,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
 
     public MockContainerLauncher(AtomicBoolean goFlag,
                                  ContainerLauncherContext 
containerLauncherContext) {
-      super("MockContainerLauncher", containerLauncherContext);
+      super(containerLauncherContext);
       this.goFlag = goFlag;
     }
 
@@ -182,7 +182,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
     }
     
     @Override
-    public void serviceStart() throws Exception {
+    public void start() throws Exception {
       taListener = (TaskAttemptListenerImpTezDag) getTaskAttemptListener();
       taskCommunicator = (TezTaskCommunicatorImpl) 
taListener.getTaskCommunicator(0);
       eventHandlingThread = new Thread(this);
@@ -199,7 +199,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
     }
 
     @Override
-    public void serviceStop() throws Exception {
+    public void shutdown() throws Exception {
       if (eventHandlingThread != null) {
         eventHandlingThread.interrupt();
         eventHandlingThread.join(2000l);

http://git-wip-us.apache.org/repos/asf/tez/blob/fc8a4ce5/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
 
b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index df643e4..41a7373 100644
--- 
a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ 
b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -299,7 +299,7 @@ public class TestTaskAttemptListenerImplTezDag {
     sessionToken.setService(identifier.getJobId());
     TokenCache.setSessionToken(sessionToken, credentials);
     taskAttemptListener = new TaskAttemptListenerImpTezDag(appContext,
-        mock(TaskHeartbeatHandler.class), 
mock(ContainerHeartbeatHandler.class), null, null, false);
+        mock(TaskHeartbeatHandler.class), 
mock(ContainerHeartbeatHandler.class), null, conf, false);
     // no exception happen, should started properly
     taskAttemptListener.init(conf);
     taskAttemptListener.start();
@@ -319,7 +319,7 @@ public class TestTaskAttemptListenerImplTezDag {
 
       conf.set(TezConfiguration.TEZ_AM_TASK_AM_PORT_RANGE, port + "-" + port);
       taskAttemptListener = new TaskAttemptListenerImpTezDag(appContext,
-          mock(TaskHeartbeatHandler.class), 
mock(ContainerHeartbeatHandler.class), null, null, false);
+          mock(TaskHeartbeatHandler.class), 
mock(ContainerHeartbeatHandler.class), null, conf, false);
       taskAttemptListener.init(conf);
       taskAttemptListener.start();
       int resultedPort = 
taskAttemptListener.getTaskCommunicator(0).getAddress().getPort();
@@ -375,10 +375,10 @@ public class TestTaskAttemptListenerImplTezDag {
     public TaskAttemptListenerImplForTest(AppContext context,
                                           TaskHeartbeatHandler thh,
                                           ContainerHeartbeatHandler chh,
-                                          JobTokenSecretManager 
jobTokenSecretManager,
                                           String[] 
taskCommunicatorClassIdentifiers,
+                                          Configuration conf,
                                           boolean isPureLocalMode) {
-      super(context, thh, chh, jobTokenSecretManager, 
taskCommunicatorClassIdentifiers,
+      super(context, thh, chh, taskCommunicatorClassIdentifiers, conf,
           isPureLocalMode);
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/fc8a4ce5/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
----------------------------------------------------------------------
diff --git 
a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
 
b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
index dbf5054..85f9415 100644
--- 
a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
+++ 
b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
@@ -54,7 +54,7 @@ public class TezTestServiceContainerLauncher extends 
ContainerLauncher {
 
   // Configuration passed in here to set up final parameters
   public TezTestServiceContainerLauncher(ContainerLauncherContext 
containerLauncherContext) {
-    super(TezTestServiceContainerLauncher.class.getName(), 
containerLauncherContext);
+    super(containerLauncherContext);
     int numThreads = getContext().getInitialConfiguration().getInt(
         
TezTestServiceConfConstants.TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS,
         
TezTestServiceConfConstants.TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS_DEFAULT);
@@ -69,13 +69,13 @@ public class TezTestServiceContainerLauncher extends 
ContainerLauncher {
   }
 
   @Override
-  public void serviceStart() {
+  public void start() {
     communicator.init(getContext().getInitialConfiguration());
     communicator.start();
   }
 
   @Override
-  public void serviceStop() {
+  public void shutdown() {
     communicator.stop();
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/fc8a4ce5/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java
----------------------------------------------------------------------
diff --git 
a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java
 
b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java
index d3743e1..7b42296 100644
--- 
a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java
+++ 
b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java
@@ -27,7 +27,7 @@ public class TezTestServiceNoOpContainerLauncher extends 
ContainerLauncher {
 
 
   public TezTestServiceNoOpContainerLauncher(ContainerLauncherContext 
containerLauncherContext) {
-    super(TezTestServiceNoOpContainerLauncher.class.getName(), 
containerLauncherContext);
+    super(containerLauncherContext);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/fc8a4ce5/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
 
b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
index 444498e..078ea79 100644
--- 
a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
+++ 
b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
@@ -23,7 +23,6 @@ import java.util.concurrent.RejectedExecutionException;
 
 import com.google.protobuf.ByteString;
 import com.google.protobuf.ServiceException;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.security.Credentials;
@@ -75,20 +74,20 @@ public class TezTestServiceTaskCommunicatorImpl extends 
TezTaskCommunicatorImpl
   }
 
   @Override
-  public void serviceInit(Configuration conf) throws Exception {
-    super.serviceInit(conf);
-    this.communicator.init(conf);
+  public void initialize() throws Exception {
+    super.initialize();
+    this.communicator.init(getContext().getInitialConfiguration());
   }
 
   @Override
-  public void serviceStart() {
-    super.serviceStart();
+  public void start() {
+    super.start();
     this.communicator.start();
   }
 
   @Override
-  public void serviceStop() {
-    super.serviceStop();
+  public void shutdown() {
+    super.shutdown();
     this.communicator.stop();
   }
 
@@ -132,7 +131,7 @@ public class TezTestServiceTaskCommunicatorImpl extends 
TezTaskCommunicatorImpl
     }
     // Have to register this up front right now. Otherwise, it's possible for 
the task to start
     // sending out status/DONE/KILLED/FAILED messages before TAImpl knows how 
to handle them.
-    getTaskCommunicatorContext()
+    getContext()
         .taskStartedRemotely(taskSpec.getTaskAttemptID(), containerId);
     communicator.submitWork(requestProto, host, port,
         new 
TezTestServiceCommunicator.ExecuteRequestCallback<SubmitWorkResponseProto>() {
@@ -154,19 +153,19 @@ public class TezTestServiceTaskCommunicatorImpl extends 
TezTaskCommunicatorImpl
               RemoteException re = (RemoteException) t;
               String message = re.toString();
               if 
(message.contains(RejectedExecutionException.class.getName())) {
-                
getTaskCommunicatorContext().taskKilled(taskSpec.getTaskAttemptID(),
+                getContext().taskKilled(taskSpec.getTaskAttemptID(),
                     TaskAttemptEndReason.SERVICE_BUSY, "Service Busy");
               } else {
-                getTaskCommunicatorContext()
+                getContext()
                     .taskFailed(taskSpec.getTaskAttemptID(), 
TaskAttemptEndReason.OTHER,
                         t.toString());
               }
             } else {
               if (t instanceof IOException) {
-                
getTaskCommunicatorContext().taskKilled(taskSpec.getTaskAttemptID(),
+                getContext().taskKilled(taskSpec.getTaskAttemptID(),
                     TaskAttemptEndReason.COMMUNICATION_ERROR, "Communication 
Error");
               } else {
-                getTaskCommunicatorContext()
+                getContext()
                     .taskFailed(taskSpec.getTaskAttemptID(), 
TaskAttemptEndReason.OTHER,
                         t.getMessage());
               }
@@ -191,11 +190,11 @@ public class TezTestServiceTaskCommunicatorImpl extends 
TezTaskCommunicatorImpl
     builder.setAmPort(getAddress().getPort());
     Credentials taskCredentials = new Credentials();
     // Credentials can change across DAGs. Ideally construct only once per DAG.
-    taskCredentials.addAll(getTaskCommunicatorContext().getCredentials());
+    taskCredentials.addAll(getContext().getCredentials());
 
     ByteBuffer credentialsBinary = credentialMap.get(taskSpec.getDAGName());
     if (credentialsBinary == null) {
-      credentialsBinary = 
serializeCredentials(getTaskCommunicatorContext().getCredentials());
+      credentialsBinary = serializeCredentials(getContext().getCredentials());
       credentialMap.putIfAbsent(taskSpec.getDAGName(), 
credentialsBinary.duplicate());
     } else {
       credentialsBinary = credentialsBinary.duplicate();

Reply via email to