Repository: tez
Updated Branches:
refs/heads/branch-0.8 a425f5d91 -> 4694a9efd
TEZ-3219. Allow service plugins to define log locations link for remotely run
task attempts. (hitesh)
(cherry picked from commit 7e9ed83184e1fefb4ecf8b560a60d2521c0f30e0)
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4694a9ef
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4694a9ef
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4694a9ef
Branch: refs/heads/branch-0.8
Commit: 4694a9efd98f779f696a3c34a354e3e7bb8dc7bf
Parents: a425f5d
Author: Hitesh Shah <[email protected]>
Authored: Mon Apr 25 11:04:41 2016 -0700
Committer: Hitesh Shah <[email protected]>
Committed: Mon Apr 25 11:05:41 2016 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/tez/common/ATSConstants.java | 7 ++
tez-dag/findbugs-exclude.xml | 2 +
.../java/org/apache/tez/dag/app/AppContext.java | 6 ++
.../org/apache/tez/dag/app/DAGAppMaster.java | 21 +++++
.../tez/dag/app/TaskCommunicatorManager.java | 30 ++++++
.../app/TaskCommunicatorManagerInterface.java | 8 ++
.../tez/dag/app/TaskCommunicatorWrapper.java | 12 +++
.../java/org/apache/tez/dag/app/dag/Vertex.java | 3 +
.../tez/dag/app/dag/impl/ServicePluginInfo.java | 99 ++++++++++++++++++++
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 55 +++++++----
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 30 +++++-
.../app/launcher/ContainerLauncherManager.java | 4 +
.../tez/dag/app/rm/TaskSchedulerManager.java | 5 +
.../dag/history/events/VertexFinishedEvent.java | 15 ++-
.../history/events/VertexInitializedEvent.java | 14 ++-
.../impl/HistoryEventJsonConversion.java | 8 ++
.../apache/tez/dag/history/utils/DAGUtils.java | 29 ++++++
.../serviceplugins/api/TaskCommunicator.java | 22 +++++
.../apache/tez/dag/app/TestRecoveryParser.java | 15 +--
.../tez/dag/app/dag/impl/TestDAGImpl.java | 5 +-
.../tez/dag/app/dag/impl/TestDAGRecovery.java | 12 +--
.../tez/dag/app/dag/impl/TestTaskAttempt.java | 13 ++-
.../tez/dag/app/dag/impl/TestVertexImpl.java | 4 +
.../TestHistoryEventsProtoConversion.java | 6 +-
.../impl/TestHistoryEventJsonConversion.java | 4 +-
.../ats/HistoryEventTimelineConversion.java | 16 +++-
.../ats/TestHistoryEventTimelineConversion.java | 59 +++++++++++-
.../java/org/apache/tez/test/TestRecovery.java | 32 +++----
29 files changed, 468 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a0a5ce6..b6c5520 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.8.4: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3219. Allow service plugins to define log locations link for remotely
run task attempts.
TEZ-3224. User payload is not initialized before creating vertex manager
plugin.
TEZ-3226. Tez UI 2: All DAGs UX improvements.
TEZ-3077. TezClient.waitTillReady should support timeout.
http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
index 0b8c67d..c56582c 100644
--- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
@@ -90,6 +90,13 @@ public class ATSConstants {
public static final String LAST_DATA_EVENTS = "lastDataEvents";
public static final String UPDATED_EDGE_MANAGERS = "updatedEdgeManagers";
public static final String CREATION_CAUSAL_ATTEMPT = "creationCausalAttempt";
+ public static final String TASK_COMMUNICATOR_NAME = "taskCommunicatorName";
+ public static final String TASK_SCHEDULER_NAME = "taskSchedulerName";
+ public static final String CONTAINER_LAUNCHER_NAME = "containerLauncherName";
+ public static final String TASK_COMMUNICATOR_CLASS_NAME =
"taskCommunicatorClassName";
+ public static final String TASK_SCHEDULER_CLASS_NAME =
"taskSchedulerClassName";
+ public static final String CONTAINER_LAUNCHER_CLASS_NAME =
"containerLauncherClassName";
+ public static final String SERVICE_PLUGIN = "servicePlugin";
/* Counters-related keys */
public static final String COUNTER_GROUPS = "counterGroups";
http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-dag/findbugs-exclude.xml b/tez-dag/findbugs-exclude.xml
index f106c8b..0f3cdca 100644
--- a/tez-dag/findbugs-exclude.xml
+++ b/tez-dag/findbugs-exclude.xml
@@ -163,6 +163,8 @@
<Field name="recoveryEnabled"/>
<Field name="isLocal"/>
<Field name="hadoopShim"/>
+ <Field name="containerLauncherManager"/>
+ <Field name="taskCommunicatorManager"/>
</Or>
<Bug pattern="IS2_INCONSISTENT_SYNC"/>
</Match>
http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
index 30716da..45ce8c1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
@@ -94,6 +94,8 @@ public interface AppContext {
TaskSchedulerManager getTaskScheduler();
+ TaskCommunicatorManagerInterface getTaskCommunicatorManager();
+
boolean isSession();
boolean isLocal();
@@ -127,6 +129,10 @@ public interface AppContext {
public String getTaskSchedulerName(int schedulerId);
public String getContainerLauncherName(int launcherId);
+ public String getTaskCommunicatorClassName(int taskCommId);
+ public String getTaskSchedulerClassName(int schedulerId);
+ public String getContainerLauncherClassName(int launcherId);
+
public HadoopShim getHadoopShim();
public DAGRecoveryData getDAGRecoveryData();
http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 7d28497..7ca7118 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -1589,6 +1589,11 @@ public class DAGAppMaster extends AbstractService {
}
@Override
+ public TaskCommunicatorManagerInterface getTaskCommunicatorManager() {
+ return taskCommunicatorManager;
+ }
+
+ @Override
public boolean isSession() {
return isSession;
}
@@ -1680,6 +1685,22 @@ public class DAGAppMaster extends AbstractService {
}
@Override
+ public String getTaskCommunicatorClassName(int taskCommId) {
+ return taskCommunicatorManager.getTaskCommunicatorClassName(taskCommId);
+ }
+
+ @Override
+ public String getTaskSchedulerClassName(int schedulerId) {
+ return taskSchedulerManager.getTaskSchedulerClassName(schedulerId);
+ }
+
+ @Override
+ public String getContainerLauncherClassName(int launcherId) {
+ return
containerLauncherManager.getContainerLauncherClassName(launcherId);
+ }
+
+
+ @Override
public HadoopShim getHadoopShim() {
return hadoopShim;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
index cfb177b..c9d1f2e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
@@ -650,4 +650,34 @@ public class TaskCommunicatorManager extends
AbstractService implements
private void sendEvent(Event<?> event) {
context.getEventHandler().handle(event);
}
+
+ @Override
+ public String getTaskCommunicatorClassName(int taskCommId) {
+ return
taskCommunicators[taskCommId].getTaskCommunicator().getClass().getName();
+ }
+
+ @Override
+ public String getInProgressLogsUrl(int taskCommId, TezTaskAttemptID
attemptID, NodeId containerNodeId) {
+ try {
+ return taskCommunicators[taskCommId].getInProgressLogsUrl(attemptID,
containerNodeId);
+ } catch (Exception e) {
+ LOG.warn("Failed to retrieve InProgressLogsUrl from TaskCommunicator,"
+ + ", communicator=" + Utils.getTaskCommIdentifierString(taskCommId,
context)
+ + ", attemptId=" + attemptID, e);
+ }
+ return null;
+ }
+
+ @Override
+ public String getCompletedLogsUrl(int taskCommId, TezTaskAttemptID
attemptID, NodeId containerNodeId) {
+ try {
+ return taskCommunicators[taskCommId].getCompletedLogsUrl(attemptID,
containerNodeId);
+ } catch (Exception e) {
+ LOG.warn("Failed to retrieve CompletedLogsUrl from TaskCommunicator,"
+ + ", communicator=" + Utils.getTaskCommIdentifierString(taskCommId,
context)
+ + ", attemptId=" + attemptID, e);
+ }
+ return null;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java
index e0f9852..254e74c 100644
---
a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java
+++
b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java
@@ -19,6 +19,7 @@
package org.apache.tez.dag.app;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.tez.serviceplugins.api.ContainerEndReason;
import org.apache.tez.serviceplugins.api.DagInfo;
import org.apache.tez.serviceplugins.api.ServicePluginError;
@@ -46,4 +47,11 @@ public interface TaskCommunicatorManagerInterface {
TaskCommunicatorWrapper getTaskCommunicator(int taskCommIndex);
void reportError(int taskCommIndex, ServicePluginError servicePluginError,
String diagnostics, DagInfo dagName);
+
+ String getTaskCommunicatorClassName(int taskCommId);
+
+ String getInProgressLogsUrl(int taskCommId, TezTaskAttemptID attemptID,
NodeId containerNodeId);
+
+ String getCompletedLogsUrl(int taskCommId, TezTaskAttemptID attemptID,
NodeId containerNodeId);
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java
index 4a75875..4afb427 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java
@@ -21,6 +21,7 @@ import java.util.Map;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.tez.serviceplugins.api.TaskCommunicator;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -80,4 +81,15 @@ public class TaskCommunicatorWrapper {
public TaskCommunicator getTaskCommunicator() {
return real;
}
+
+ public String getInProgressLogsUrl(TezTaskAttemptID attemptID, NodeId
containerNodeId)
+ throws Exception {
+ return real.getInProgressLogsUrl(attemptID, containerNodeId);
+ }
+
+ public String getCompletedLogsUrl(TezTaskAttemptID attemptID, NodeId
containerNodeId)
+ throws Exception {
+ return real.getCompletedLogsUrl(attemptID, containerNodeId);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 25fbf3a..1b3b39c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -49,6 +49,7 @@ import org.apache.tez.dag.app.TaskAttemptEventInfo;
import org.apache.tez.dag.app.dag.event.SpeculatorEvent;
import org.apache.tez.dag.app.dag.impl.AMUserCodeException;
import org.apache.tez.dag.app.dag.impl.Edge;
+import org.apache.tez.dag.app.dag.impl.ServicePluginInfo;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
@@ -190,6 +191,8 @@ public interface Vertex extends Comparable<Vertex> {
public int getContainerLauncherIdentifier();
public int getTaskCommunicatorIdentifier();
+ public ServicePluginInfo getServicePluginInfo();
+
public long getInitTime();
public long getStartTime();
public long getFinishTime();
http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ServicePluginInfo.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ServicePluginInfo.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ServicePluginInfo.java
new file mode 100644
index 0000000..2ff1ee5
--- /dev/null
+++
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ServicePluginInfo.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.impl;
+
+
+public class ServicePluginInfo {
+
+ private String containerLauncherName;
+ private String taskSchedulerName;
+ private String taskCommunicatorName;
+ private String containerLauncherClassName;
+ private String taskSchedulerClassName;
+ private String taskCommunicatorClassName;
+
+ public ServicePluginInfo() {
+ }
+
+ public String getContainerLauncherName() {
+ return containerLauncherName;
+ }
+
+ public ServicePluginInfo setContainerLauncherName(String
containerLauncherName) {
+ this.containerLauncherName = containerLauncherName;
+ return this;
+ }
+
+ public String getTaskSchedulerName() {
+ return taskSchedulerName;
+ }
+
+ public ServicePluginInfo setTaskSchedulerName(String taskSchedulerName) {
+ this.taskSchedulerName = taskSchedulerName;
+ return this;
+ }
+
+ public String getTaskCommunicatorName() {
+ return taskCommunicatorName;
+ }
+
+ public ServicePluginInfo setTaskCommunicatorName(String
taskCommunicatorName) {
+ this.taskCommunicatorName = taskCommunicatorName;
+ return this;
+ }
+
+ public String getContainerLauncherClassName() {
+ return containerLauncherClassName;
+ }
+
+ public ServicePluginInfo setContainerLauncherClassName(String
containerLauncherClassName) {
+ this.containerLauncherClassName = containerLauncherClassName;
+ return this;
+ }
+
+ public String getTaskSchedulerClassName() {
+ return taskSchedulerClassName;
+ }
+
+ public ServicePluginInfo setTaskSchedulerClassName(String
taskSchedulerClassName) {
+ this.taskSchedulerClassName = taskSchedulerClassName;
+ return this;
+ }
+
+ public String getTaskCommunicatorClassName() {
+ return taskCommunicatorClassName;
+ }
+
+ public ServicePluginInfo setTaskCommunicatorClassName(String
taskCommunicatorClassName) {
+ this.taskCommunicatorClassName = taskCommunicatorClassName;
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return "ServicePluginInfo {" +
+ "containerLauncherName=" + containerLauncherName +
+ ", taskSchedulerName=" + taskSchedulerName +
+ ", taskCommunicatorName=" + taskCommunicatorName +
+ ", containerLauncherClassName=" + containerLauncherClassName +
+ ", taskSchedulerClassName=" + taskSchedulerClassName +
+ ", taskCommunicatorClassName=" + taskCommunicatorClassName +
+ " }";
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 6169a7b..6d9247e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -31,6 +31,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.app.dag.event.TaskEvent;
import org.apache.tez.dag.app.dag.event.TaskEventTAFailed;
import org.apache.tez.dag.app.dag.event.TaskEventTAKilled;
@@ -1107,31 +1108,49 @@ public class TaskAttemptImpl implements TaskAttempt,
private String getInProgressLogsUrl() {
String inProgressLogsUrl = null;
- if (containerId != null && nodeHttpAddress != null) {
- final String containerIdStr = containerId.toString();
- inProgressLogsUrl = nodeHttpAddress
- + "/" + "node/containerlogs"
- + "/" + containerIdStr
- + "/" + this.appContext.getUser();
+ if (getVertex().getServicePluginInfo().getContainerLauncherName().equals(
+ TezConstants.getTezYarnServicePluginName())
+ ||
getVertex().getServicePluginInfo().getContainerLauncherName().equals(
+ TezConstants.getTezUberServicePluginName())) {
+ if (containerId != null && nodeHttpAddress != null) {
+ final String containerIdStr = containerId.toString();
+ inProgressLogsUrl = nodeHttpAddress
+ + "/" + "node/containerlogs"
+ + "/" + containerIdStr
+ + "/" + this.appContext.getUser();
+ }
+ } else {
+ inProgressLogsUrl =
appContext.getTaskCommunicatorManager().getInProgressLogsUrl(
+ getVertex().getTaskCommunicatorIdentifier(),
+ attemptId, containerNodeId);
}
return inProgressLogsUrl;
}
private String getCompletedLogsUrl() {
String completedLogsUrl = null;
- if (containerId != null && containerNodeId != null && nodeHttpAddress !=
null) {
- final String containerIdStr = containerId.toString();
- if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
- YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)
- && conf.get(YarnConfiguration.YARN_LOG_SERVER_URL) != null) {
- String contextStr = "v_" + getVertex().getName()
- + "_" + this.attemptId.toString();
- completedLogsUrl = conf.get(YarnConfiguration.YARN_LOG_SERVER_URL)
- + "/" + containerNodeId.toString()
- + "/" + containerIdStr
- + "/" + contextStr
- + "/" + this.appContext.getUser();
+ if (getVertex().getServicePluginInfo().getContainerLauncherName().equals(
+ TezConstants.getTezYarnServicePluginName())
+ ||
getVertex().getServicePluginInfo().getContainerLauncherName().equals(
+ TezConstants.getTezUberServicePluginName())) {
+ if (containerId != null && containerNodeId != null && nodeHttpAddress !=
null) {
+ final String containerIdStr = containerId.toString();
+ if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
+ YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)
+ && conf.get(YarnConfiguration.YARN_LOG_SERVER_URL) != null) {
+ String contextStr = "v_" + getVertex().getName()
+ + "_" + this.attemptId.toString();
+ completedLogsUrl = conf.get(YarnConfiguration.YARN_LOG_SERVER_URL)
+ + "/" + containerNodeId.toString()
+ + "/" + containerIdStr
+ + "/" + contextStr
+ + "/" + this.appContext.getUser();
+ }
}
+ } else {
+ completedLogsUrl =
appContext.getTaskCommunicatorManager().getCompletedLogsUrl(
+ getVertex().getTaskCommunicatorIdentifier(),
+ attemptId, containerNodeId);
}
return completedLogsUrl;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index ea202f7..b22af1a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -247,6 +247,9 @@ public class VertexImpl implements
org.apache.tez.dag.app.dag.Vertex, EventHandl
@VisibleForTesting
final int taskCommunicatorIdentifier;
+ final ServicePluginInfo servicePluginInfo;
+
+
//fields initialized in init
@VisibleForTesting
@@ -1009,6 +1012,17 @@ public class VertexImpl implements
org.apache.tez.dag.app.dag.Vertex, EventHandl
LOG.error("Failed to get index for containerLauncher: " +
containerLauncherName);
throw e;
}
+ this.servicePluginInfo = new ServicePluginInfo()
+ .setContainerLauncherName(
+
appContext.getContainerLauncherName(this.containerLauncherIdentifier))
+
.setTaskSchedulerName(appContext.getTaskSchedulerName(this.taskSchedulerIdentifier))
+
.setTaskCommunicatorName(appContext.getTaskCommunicatorName(this.taskCommunicatorIdentifier))
+ .setContainerLauncherClassName(
+
appContext.getContainerLauncherClassName(this.containerLauncherIdentifier))
+ .setTaskSchedulerClassName(
+ appContext.getTaskSchedulerClassName(this.taskSchedulerIdentifier))
+ .setTaskCommunicatorClassName(
+
appContext.getTaskCommunicatorClassName(this.taskCommunicatorIdentifier));
StringBuilder sb = new StringBuilder();
sb.append("Running vertex: ").append(logIdentifier).append(" : ")
@@ -1043,6 +1057,11 @@ public class VertexImpl implements
org.apache.tez.dag.app.dag.Vertex, EventHandl
}
@Override
+ public ServicePluginInfo getServicePluginInfo() {
+ return servicePluginInfo;
+ }
+
+ @Override
public boolean isSpeculationEnabled() {
return isSpeculationEnabled;
}
@@ -1923,8 +1942,9 @@ public class VertexImpl implements
org.apache.tez.dag.app.dag.Vertex, EventHandl
void logJobHistoryVertexInitializedEvent() {
if (recoveryData == null || !recoveryData.shouldSkipInit()) {
VertexInitializedEvent initEvt = new VertexInitializedEvent(vertexId,
vertexName,
- initTimeRequested, initedTime, numTasks,
- getProcessorName(), getAdditionalInputs(), initGeneratedEvents);
+ initTimeRequested, initedTime, numTasks,
+ getProcessorName(), getAdditionalInputs(), initGeneratedEvents,
+ servicePluginInfo);
this.appContext.getHistoryHandler().handle(
new DAGHistoryEvent(getDAGId(), initEvt));
}
@@ -1989,9 +2009,9 @@ public class VertexImpl implements
org.apache.tez.dag.app.dag.Vertex, EventHandl
taskStats.put(ATSConstants.NUM_FAILED_TASKS_ATTEMPTS,
failedTaskAttemptCount.get());
taskStats.put(ATSConstants.NUM_KILLED_TASKS_ATTEMPTS,
killedTaskAttemptCount.get());
- VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId,
vertexName, numTasks, initTimeRequested,
- initedTime, startTimeRequested, startedTime, finishTime, finalState,
diagnostics,
- counters, getVertexStats(), taskStats);
+ VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId,
vertexName, numTasks,
+ initTimeRequested, initedTime, startTimeRequested, startedTime,
finishTime, finalState,
+ diagnostics, counters, getVertexStats(), taskStats, servicePluginInfo);
this.appContext.getHistoryHandler().handleCriticalEvent(
new DAGHistoryEvent(getDAGId(), finishEvt));
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
index 1f5151b..f2c1cff 100644
---
a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
+++
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
@@ -279,4 +279,8 @@ public class ContainerLauncherManager extends
AbstractService
private void sendEvent(Event<?> event) {
appContext.getEventHandler().handle(event);
}
+
+ public String getContainerLauncherClassName(int containerLauncherIndex) {
+ return
containerLaunchers[containerLauncherIndex].getContainerLauncher().getClass().getName();
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
index 16f9a28..e68c9b8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
@@ -996,4 +996,9 @@ public class TaskSchedulerManager extends AbstractService
implements
return historyUrl;
}
+
+ public String getTaskSchedulerClassName(int taskSchedulerIndex) {
+ return
taskSchedulers[taskSchedulerIndex].getTaskScheduler().getClass().getName();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
index f914165..a2cdae2 100644
---
a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
+++
b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
@@ -23,6 +23,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
+import org.apache.tez.dag.app.dag.impl.ServicePluginInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.tez.common.counters.TezCounters;
@@ -55,12 +56,14 @@ public class VertexFinishedEvent implements HistoryEvent,
SummaryEvent {
private boolean fromSummary = false;
private VertexStats vertexStats;
private Map<String, Integer> vertexTaskStats;
+ private ServicePluginInfo servicePluginInfo;
public VertexFinishedEvent(TezVertexID vertexId, String vertexName, int
numTasks, long initRequestedTime,
long initedTime, long startRequestedTime, long
startedTime,
long finishTime, VertexState state, String
diagnostics,
TezCounters counters, VertexStats vertexStats,
- Map<String, Integer> vertexTaskStats) {
+ Map<String, Integer> vertexTaskStats,
+ ServicePluginInfo servicePluginInfo) {
this.vertexName = vertexName;
this.vertexID = vertexId;
this.numTasks = numTasks;
@@ -74,6 +77,7 @@ public class VertexFinishedEvent implements HistoryEvent,
SummaryEvent {
this.tezCounters = counters;
this.vertexStats = vertexStats;
this.vertexTaskStats = vertexTaskStats;
+ this.servicePluginInfo = servicePluginInfo;
}
public VertexFinishedEvent() {
@@ -147,7 +151,9 @@ public class VertexFinishedEvent implements HistoryEvent,
SummaryEvent {
+ ", counters=" + ( tezCounters == null ? "null" :
tezCounters.toString().replaceAll("\\n", ", ").replaceAll("\\s+", "
"))
+ ", vertexStats=" + (vertexStats == null ? "null" :
vertexStats.toString())
- + ", vertexTaskStats=" + (vertexTaskStats == null ? "null" :
vertexTaskStats.toString());
+ + ", vertexTaskStats=" + (vertexTaskStats == null ? "null" :
vertexTaskStats.toString())
+ + ", servicePluginInfo="
+ + (servicePluginInfo != null ? servicePluginInfo : "null");
}
public TezVertexID getVertexID() {
@@ -227,4 +233,9 @@ public class VertexFinishedEvent implements HistoryEvent,
SummaryEvent {
public boolean isFromSummary() {
return fromSummary;
}
+
+ public ServicePluginInfo getServicePluginInfo() {
+ return servicePluginInfo;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
index 052908b..90099fc 100644
---
a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
+++
b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
@@ -30,6 +30,7 @@ import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.RootInputLeafOutput;
import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
+import org.apache.tez.dag.app.dag.impl.ServicePluginInfo;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.utils.TezEventUtils;
@@ -51,6 +52,7 @@ public class VertexInitializedEvent implements HistoryEvent {
private String processorName;
private Map<String, RootInputLeafOutput<InputDescriptor,
InputInitializerDescriptor>> additionalInputs;
private List<TezEvent> initGeneratedEvents;
+ private ServicePluginInfo servicePluginInfo;
public VertexInitializedEvent() {
}
@@ -59,7 +61,7 @@ public class VertexInitializedEvent implements HistoryEvent {
String vertexName, long initRequestedTime, long initedTime,
int numTasks, String processorName,
Map<String, RootInputLeafOutput<InputDescriptor,
InputInitializerDescriptor>> additionalInputs,
- List<TezEvent> initGeneratedEvents) {
+ List<TezEvent> initGeneratedEvents, ServicePluginInfo servicePluginInfo)
{
this.vertexName = vertexName;
this.vertexID = vertexId;
this.initRequestedTime = initRequestedTime;
@@ -68,6 +70,7 @@ public class VertexInitializedEvent implements HistoryEvent {
this.processorName = processorName;
this.additionalInputs = additionalInputs;
this.initGeneratedEvents = initGeneratedEvents;
+ this.servicePluginInfo = servicePluginInfo;
}
@Override
@@ -173,7 +176,9 @@ public class VertexInitializedEvent implements HistoryEvent
{
+ ", additionalInputsCount="
+ (additionalInputs != null ? additionalInputs.size() : 0)
+ ", initGeneratedEventsCount="
- + (initGeneratedEvents != null ? initGeneratedEvents.size() : 0);
+ + (initGeneratedEvents != null ? initGeneratedEvents.size() : 0)
+ + ", servicePluginInfo="
+ + (servicePluginInfo != null ? servicePluginInfo : "null");
}
public TezVertexID getVertexID() {
@@ -208,4 +213,9 @@ public class VertexInitializedEvent implements HistoryEvent
{
public List<TezEvent> getInitGeneratedEvents() {
return initGeneratedEvents;
}
+
+ public ServicePluginInfo getServicePluginInfo() {
+ return servicePluginInfo;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
index 75116c8..a767fbf 100644
---
a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
+++
b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
@@ -716,6 +716,10 @@ public class HistoryEventJsonConversion {
otherInfo.put(entry.getKey(), entry.getValue().intValue());
}
}
+ if (event.getServicePluginInfo() != null) {
+ otherInfo.put(ATSConstants.SERVICE_PLUGIN,
+ DAGUtils.convertServicePluginToJSON(event.getServicePluginInfo()));
+ }
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
@@ -787,6 +791,10 @@ public class HistoryEventJsonConversion {
otherInfo.put(ATSConstants.INIT_TIME, event.getInitedTime());
otherInfo.put(ATSConstants.NUM_TASKS, event.getNumTasks());
otherInfo.put(ATSConstants.PROCESSOR_CLASS_NAME, event.getProcessorName());
+ if (event.getServicePluginInfo() != null) {
+ otherInfo.put(ATSConstants.SERVICE_PLUGIN,
+ DAGUtils.convertServicePluginToJSON(event.getServicePluginInfo()));
+ }
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
return jsonObject;
http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
index 781120c..f1ac0ab 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
@@ -42,6 +42,7 @@ import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.api.records.DAGProtos.PlanGroupInputEdgeInfo;
+import org.apache.tez.dag.app.dag.impl.ServicePluginInfo;
import org.apache.tez.dag.app.dag.impl.VertexStats;
import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl.DataEventDependencyInfo;
import org.apache.tez.dag.history.logging.EntityTypes;
@@ -405,6 +406,34 @@ public class DAGUtils {
return vertexStatsMap;
}
+ public static JSONObject convertServicePluginToJSON(
+ ServicePluginInfo servicePluginInfo) {
+ JSONObject jsonObject = new
JSONObject(convertServicePluginToATSMap(servicePluginInfo));
+ return jsonObject;
+ }
+
+ public static Map<String,Object> convertServicePluginToATSMap(
+ ServicePluginInfo servicePluginInfo) {
+ Map<String, Object> servicePluginMap = new LinkedHashMap<String, Object>();
+ if (servicePluginInfo == null) {
+ return servicePluginMap;
+ }
+ servicePluginMap.put(ATSConstants.TASK_SCHEDULER_NAME,
+ servicePluginInfo.getTaskSchedulerName());
+ servicePluginMap.put(ATSConstants.TASK_SCHEDULER_CLASS_NAME,
+ servicePluginInfo.getTaskSchedulerClassName());
+ servicePluginMap.put(ATSConstants.TASK_COMMUNICATOR_NAME,
+ servicePluginInfo.getTaskCommunicatorName());
+ servicePluginMap.put(ATSConstants.TASK_COMMUNICATOR_CLASS_NAME,
+ servicePluginInfo.getTaskCommunicatorClassName());
+ servicePluginMap.put(ATSConstants.CONTAINER_LAUNCHER_NAME,
+ servicePluginInfo.getContainerLauncherName());
+ servicePluginMap.put(ATSConstants.CONTAINER_LAUNCHER_CLASS_NAME,
+ servicePluginInfo.getContainerLauncherClassName());
+ return servicePluginMap;
+ }
+
+
public static Map<String,Object> convertEdgeProperty(
EdgeProperty edge) {
Map<String, Object> jsonDescriptor = new HashMap<String, Object>();
http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicator.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicator.java
b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicator.java
index 8f919d1..34debd4 100644
---
a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicator.java
+++
b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicator.java
@@ -35,6 +35,7 @@ import java.util.Map;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.tez.common.ServicePluginLifecycle;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -229,4 +230,25 @@ public abstract class TaskCommunicator implements
ServicePluginLifecycle {
* This will cause the app to shutdown.
*/
public abstract Object getMetaInfo() throws ServicePluginException;
+
+ /**
+ * Return a URL that can be used as a link to the logs for a running attempt.
+ * @param attemptID Attempt ID for which the log link should be provided
+ * @param containerNodeId Node Id on which the attempt is meant to have run
+ * @return URL to logs for the attempt
+ */
+ public String getInProgressLogsUrl(TezTaskAttemptID attemptID, NodeId
containerNodeId) {
+ return null;
+ }
+
+ /**
+ * Return a URL that can be used as a link to the logs for a completed
attempt.
+ * @param attemptID Attempt ID for which the log link should be provided
+ * @param containerNodeId Node Id on which the attempt is meant to have run
+ * @return URL to logs for the attempt
+ */
+ public String getCompletedLogsUrl(TezTaskAttemptID attemptID, NodeId
containerNodeId) {
+ return null;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
index 962b230..f4edf9e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
@@ -453,7 +453,7 @@ public class TestRecoveryParser {
rService.handle(new DAGHistoryEvent(dagID,
new VertexFinishedEvent(vertexId, "v1", 10, 0L, 0L,
0L, 0L, 0L, VertexState.SUCCEEDED,
- "", null, null, null)));
+ "", null, null, null, null)));
rService.stop();
DAGRecoveryData dagData = parser.parseRecoveryData();
@@ -531,11 +531,11 @@ public class TestRecoveryParser {
rService.handle(new DAGHistoryEvent(dagID,
new VertexFinishedEvent(v0, "v1", 10, 0L, 0L,
0L, 0L, 0L, VertexState.SUCCEEDED,
- "", null, null, null)));
+ "", null, null, null, null)));
rService.handle(new DAGHistoryEvent(dagID,
new VertexFinishedEvent(v1, "v1", 10, 0L, 0L,
0L, 0L, 0L, VertexState.SUCCEEDED,
- "", null, null, null)));
+ "", null, null, null, null)));
rService.stop();
DAGRecoveryData dagData = parser.parseRecoveryData();
@@ -573,7 +573,7 @@ public class TestRecoveryParser {
rService.handle(new DAGHistoryEvent(dagID,
new VertexFinishedEvent(vertexId, "v1", 10, 0L, 0L,
0L, 0L, 0L, VertexState.SUCCEEDED,
- "", null, null, null)));
+ "", null, null, null, null)));
rService.stop();
DAGRecoveryData dagData = parser.parseRecoveryData();
@@ -652,18 +652,19 @@ public class TestRecoveryParser {
TezVertexID v1Id = TezVertexID.getInstance(dagID, 1);
TezVertexID v2Id = TezVertexID.getInstance(dagID, 2);
// v0 VertexInitializedEvent
- VertexInitializedEvent v0InitedEvent = new VertexInitializedEvent(v0Id,
"v0", 200L, 400L, 2, null, null, null);
+ VertexInitializedEvent v0InitedEvent = new VertexInitializedEvent(
+ v0Id, "v0", 200L, 400L, 2, null, null, null, null);
rService.handle(new DAGHistoryEvent(dagID, v0InitedEvent));
// v1 VertexFinishedEvent(KILLED)
VertexFinishedEvent v1FinishedEvent = new VertexFinishedEvent(v1Id, "v1",
2, 300L, 400L,
500L, 600L, 700L, VertexState.KILLED,
- "", null, null, null);
+ "", null, null, null, null);
rService.handle(new DAGHistoryEvent(dagID, v1FinishedEvent));
// v2 VertexInitializedEvent -> VertexStartedEvent
List<TezEvent> initGeneratedEvents = Lists.newArrayList(
new TezEvent(DataMovementEvent.create(ByteBuffer.wrap(new byte[0])),
null));
VertexInitializedEvent v2InitedEvent = new VertexInitializedEvent(v2Id,
"v2", 200L, 300L,
- 2, null, null, initGeneratedEvents);
+ 2, null, null, initGeneratedEvents, null);
VertexStartedEvent v2StartedEvent = new VertexStartedEvent(v2Id, 0L, 0L);
rService.handle(new DAGHistoryEvent(dagID, v2InitedEvent));
rService.handle(new DAGHistoryEvent(dagID, v2StartedEvent));
http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 480e3cf..4471278 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -19,9 +19,9 @@
package org.apache.tez.dag.app.dag.impl;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
@@ -40,6 +40,7 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang.StringUtils;
import org.apache.tez.common.counters.Limits;
import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag;
import org.apache.tez.hadoop.shim.DefaultHadoopShim;
import org.apache.tez.hadoop.shim.HadoopShim;
@@ -986,6 +987,8 @@ public class TestDAGImpl {
doReturn(clusterInfo).when(dagWithCustomEdgeAppContext).getClusterInfo();
dispatcher.register(TaskAttemptEventType.class, new
TaskAttemptEventDisptacher2());
dispatcher.register(AMSchedulerEventType.class, new
AMSchedulerEventHandler());
+
when(dagWithCustomEdgeAppContext.getContainerLauncherName(anyInt())).thenReturn(
+ TezConstants.getTezYarnServicePluginName());
}
private void initDAG(DAGImpl impl) {
http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
index be1821b..3c284ec 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
@@ -700,7 +700,7 @@ public class TestDAGRecovery {
List<TezEvent> inputGeneratedTezEvents = new ArrayList<TezEvent>();
VertexInitializedEvent v1InitedEvent = new VertexInitializedEvent(v1Id,
"vertex1", 0L, v1InitedTime,
- v1NumTask, "", null, inputGeneratedTezEvents);
+ v1NumTask, "", null, inputGeneratedTezEvents, null);
VertexRecoveryData vertexRecoveryData = new
VertexRecoveryData(v1InitedEvent,
null, null, null, null, false);
doReturn(vertexRecoveryData).when(dagRecoveryData).getVertexRecoveryData(v1Id);
@@ -732,7 +732,7 @@ public class TestDAGRecovery {
List<TezEvent> inputGeneratedTezEvents = new ArrayList<TezEvent>();
VertexInitializedEvent v1InitedEvent = new VertexInitializedEvent(v1Id,
"vertex1", 0L, v1InitedTime,
- v1NumTask, "", null, inputGeneratedTezEvents);
+ v1NumTask, "", null, inputGeneratedTezEvents, null);
VertexConfigurationDoneEvent v1ReconfigureDoneEvent = new
VertexConfigurationDoneEvent(v1Id,
0L, v1NumTask, null, null, null, true);
VertexRecoveryData vertexRecoveryData = new
VertexRecoveryData(v1InitedEvent,
@@ -772,7 +772,7 @@ public class TestDAGRecovery {
List<TezEvent> inputGeneratedTezEvents = new ArrayList<TezEvent>();
VertexInitializedEvent v1InitedEvent = new VertexInitializedEvent(v1Id,
"vertex1", 0L, v1InitedTime,
- v1NumTask, "", null, inputGeneratedTezEvents);
+ v1NumTask, "", null, inputGeneratedTezEvents, null);
VertexConfigurationDoneEvent v1ReconfigureDoneEvent = new
VertexConfigurationDoneEvent(v1Id,
0L, v1NumTask, null, null, null, true);
VertexStartedEvent v1StartedEvent = new VertexStartedEvent(v1Id, 0L,
v1StartedTime);
@@ -803,7 +803,7 @@ public class TestDAGRecovery {
List<TezEvent> inputGeneratedTezEvents = new ArrayList<TezEvent>();
VertexInitializedEvent v1InitedEvent = new VertexInitializedEvent(v1Id,
"vertex1", 0L, v1InitedTime,
- v1NumTask, "", null, inputGeneratedTezEvents);
+ v1NumTask, "", null, inputGeneratedTezEvents, null);
Map<String, InputSpecUpdate> rootInputSpecs = new HashMap<String,
InputSpecUpdate>();
VertexConfigurationDoneEvent v1ReconfigureDoneEvent = new
VertexConfigurationDoneEvent(v1Id,
0L, v1NumTask, null, null, rootInputSpecs, true);
@@ -911,7 +911,7 @@ public class TestDAGRecovery {
List<TezEvent> inputGeneratedTezEvents = new ArrayList<TezEvent>();
VertexInitializedEvent v1InitedEvent = new VertexInitializedEvent(v1Id,
"vertex1", 0L, v1InitedTime,
- v1NumTask, "", null, inputGeneratedTezEvents);
+ v1NumTask, "", null, inputGeneratedTezEvents, null);
Map<String, InputSpecUpdate> rootInputSpecs = new HashMap<String,
InputSpecUpdate>();
VertexConfigurationDoneEvent v1ReconfigureDoneEvent = new
VertexConfigurationDoneEvent(v1Id,
0L, v1NumTask, null, null, rootInputSpecs, true);
@@ -1079,7 +1079,7 @@ public class TestDAGRecovery {
VertexInitializedEvent v2InitedEvent = new VertexInitializedEvent(v2Id,
"vertex2", 0L, v1InitedTime,
- v1NumTask, "", null, null);
+ v1NumTask, "", null, null, null);
VertexConfigurationDoneEvent v2ReconfigureDoneEvent = new
VertexConfigurationDoneEvent(v2Id,
0L, v1NumTask, null, null, null, false);
VertexStartedEvent v2StartedEvent = new VertexStartedEvent(v2Id, 0L,
v1StartedTime);
http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 68cfc39..e4cd956 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -21,6 +21,7 @@ package org.apache.tez.dag.app.dag.impl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -61,6 +62,7 @@ import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.tez.common.MockDNSToSwitchMapping;
+import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.app.dag.event.TaskEventTAFailed;
import org.apache.tez.dag.app.dag.event.TaskEventTAKilled;
import org.apache.tez.dag.app.dag.event.TaskEventTASucceeded;
@@ -139,6 +141,9 @@ public class TestTaskAttempt {
AppContext appCtx;
Task mockTask;
TaskLocationHint locationHint;
+ Vertex mockVertex;
+ ServicePluginInfo servicePluginInfo = new ServicePluginInfo()
+ .setContainerLauncherName(TezConstants.getTezYarnServicePluginName());
@BeforeClass
public static void setup() {
@@ -148,7 +153,14 @@ public class TestTaskAttempt {
@Before
public void setupTest() {
appCtx = mock(AppContext.class);
+ when(appCtx.getContainerLauncherName(anyInt())).thenReturn(
+ TezConstants.getTezYarnServicePluginName());
+
mockTask = mock(Task.class);
+ mockVertex = mock(Vertex.class);
+ when(mockTask.getVertex()).thenReturn(mockVertex);
+ when(mockVertex.getServicePluginInfo()).thenReturn(servicePluginInfo);
+
HistoryEventHandler mockHistHandler = mock(HistoryEventHandler.class);
doReturn(mockHistHandler).when(appCtx).getHistoryHandler();
LogManager.getRootLogger().setLevel(Level.DEBUG);
@@ -1737,7 +1749,6 @@ public class TestTaskAttempt {
}
- Vertex mockVertex = mock(Vertex.class);
boolean inputFailedReported = false;
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index d2d9a07..6b30a24 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.counters.Limits;
import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.app.dag.event.TaskEventTALaunched;
import org.apache.tez.dag.app.dag.event.TaskEventTASucceeded;
import org.apache.tez.hadoop.shim.DefaultHadoopShim;
@@ -2354,6 +2355,9 @@ public class TestVertexImpl {
dispatcher = new DrainDispatcher();
appContext = mock(AppContext.class);
when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim());
+ when(appContext.getContainerLauncherName(anyInt())).thenReturn(
+ TezConstants.getTezYarnServicePluginName());
+
thh = mock(TaskHeartbeatHandler.class);
historyEventHandler = mock(HistoryEventHandler.class);
TaskSchedulerManager taskScheduler = mock(TaskSchedulerManager.class);
http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index 5e7e906..7d3faf9 100644
---
a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++
b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -276,7 +276,7 @@ public class TestHistoryEventsProtoConversion {
VertexInitializedEvent event = new VertexInitializedEvent(
TezVertexID.getInstance(
TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111),
- "vertex1", 1000l, 15000l, 100, "procName", null, initGeneratedEvents);
+ "vertex1", 1000l, 15000l, 100, "procName", null, initGeneratedEvents,
null);
VertexInitializedEvent deserializedEvent = (VertexInitializedEvent)
testProtoConversion(event);
Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
@@ -386,7 +386,7 @@ public class TestHistoryEventsProtoConversion {
new VertexFinishedEvent(TezVertexID.getInstance(
TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111),
"vertex1", 1, 1000l, 15000l, 16000l, 20000l, 1344400l,
VertexState.ERROR,
- null, null, null, null);
+ null, null, null, null, null);
VertexFinishedEvent deserializedEvent = (VertexFinishedEvent)
testProtoConversion(event);
Assert.assertEquals(event.getVertexID(),
deserializedEvent.getVertexID());
@@ -401,7 +401,7 @@ public class TestHistoryEventsProtoConversion {
new VertexFinishedEvent(TezVertexID.getInstance(
TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111),
"vertex1", 1, 1000l, 15000l, 16000l, 20000l, 1344400l,
VertexState.ERROR,
- "diagnose", new TezCounters(), new VertexStats(), null);
+ "diagnose", new TezCounters(), new VertexStats(), null, null);
VertexFinishedEvent deserializedEvent = (VertexFinishedEvent)
testProtoConversion(event);
Assert.assertEquals(event.getVertexID(),
deserializedEvent.getVertexID());
http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
index 5c596c5..9477118 100644
---
a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
+++
b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
@@ -138,7 +138,7 @@ public class TestHistoryEventJsonConversion {
break;
case VERTEX_INITIALIZED:
event = new VertexInitializedEvent(tezVertexID, "v1",
random.nextInt(), random.nextInt(),
- random.nextInt(), "proc", null, null);
+ random.nextInt(), "proc", null, null, null);
break;
case VERTEX_STARTED:
event = new VertexStartedEvent(tezVertexID, random.nextInt(),
random.nextInt());
@@ -149,7 +149,7 @@ public class TestHistoryEventJsonConversion {
case VERTEX_FINISHED:
event = new VertexFinishedEvent(tezVertexID, "v1", 1,
random.nextInt(), random.nextInt(),
random.nextInt(), random.nextInt(), random.nextInt(),
VertexState.ERROR,
- null, null, null, null);
+ null, null, null, null, null);
break;
case TASK_STARTED:
event = new TaskStartedEvent(tezTaskID, "v1", random.nextInt(),
random.nextInt());
http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git
a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
index 1e77ce8..62f0937 100644
---
a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
+++
b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
@@ -515,8 +515,12 @@ public class HistoryEventTimelineConversion {
atsEntity.addEvent(startEvt);
atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime());
- atsEntity.addOtherInfo(ATSConstants.IN_PROGRESS_LOGS_URL,
event.getInProgressLogsUrl());
- atsEntity.addOtherInfo(ATSConstants.COMPLETED_LOGS_URL,
event.getCompletedLogsUrl());
+ if (event.getInProgressLogsUrl() != null) {
+ atsEntity.addOtherInfo(ATSConstants.IN_PROGRESS_LOGS_URL,
event.getInProgressLogsUrl());
+ }
+ if (event.getCompletedLogsUrl() != null) {
+ atsEntity.addOtherInfo(ATSConstants.COMPLETED_LOGS_URL,
event.getCompletedLogsUrl());
+ }
atsEntity.addOtherInfo(ATSConstants.NODE_ID, event.getNodeId().toString());
atsEntity.addOtherInfo(ATSConstants.NODE_HTTP_ADDRESS,
event.getNodeHttpAddress());
atsEntity.addOtherInfo(ATSConstants.CONTAINER_ID,
event.getContainerId().toString());
@@ -615,6 +619,10 @@ public class HistoryEventTimelineConversion {
DAGUtils.convertCountersToATSMap(event.getTezCounters()));
atsEntity.addOtherInfo(ATSConstants.STATS,
DAGUtils.convertVertexStatsToATSMap(event.getVertexStats()));
+ if (event.getServicePluginInfo() != null) {
+ atsEntity.addOtherInfo(ATSConstants.SERVICE_PLUGIN,
+ DAGUtils.convertServicePluginToATSMap(event.getServicePluginInfo()));
+ }
final Map<String, Integer> vertexTaskStats = event.getVertexTaskStats();
if (vertexTaskStats != null) {
@@ -651,6 +659,10 @@ public class HistoryEventTimelineConversion {
atsEntity.addOtherInfo(ATSConstants.INIT_TIME, event.getInitedTime());
atsEntity.addOtherInfo(ATSConstants.NUM_TASKS, event.getNumTasks());
atsEntity.addOtherInfo(ATSConstants.PROCESSOR_CLASS_NAME,
event.getProcessorName());
+ if (event.getServicePluginInfo() != null) {
+ atsEntity.addOtherInfo(ATSConstants.SERVICE_PLUGIN,
+ DAGUtils.convertServicePluginToATSMap(event.getServicePluginInfo()));
+ }
return atsEntity;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git
a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
index abfd757..f622056 100644
---
a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
+++
b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -47,6 +47,7 @@ import
org.apache.tez.dag.api.records.DAGProtos.CallerContextProto;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.app.dag.impl.ServicePluginInfo;
import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl.DataEventDependencyInfo;
import org.apache.tez.dag.app.dag.impl.VertexStats;
import org.apache.tez.dag.app.web.AMWebController;
@@ -157,7 +158,7 @@ public class TestHistoryEventTimelineConversion {
break;
case VERTEX_INITIALIZED:
event = new VertexInitializedEvent(tezVertexID, "v1",
random.nextInt(), random.nextInt(),
- random.nextInt(), "proc", null, null);
+ random.nextInt(), "proc", null, null, null);
break;
case VERTEX_STARTED:
event = new VertexStartedEvent(tezVertexID, random.nextInt(),
random.nextInt());
@@ -168,7 +169,7 @@ public class TestHistoryEventTimelineConversion {
case VERTEX_FINISHED:
event = new VertexFinishedEvent(tezVertexID, "v1", 1,
random.nextInt(), random.nextInt(),
random.nextInt(), random.nextInt(), random.nextInt(),
VertexState.ERROR,
- null, null, null, null);
+ null, null, null, null, null);
break;
case TASK_STARTED:
event = new TaskStartedEvent(tezTaskID, "v1", random.nextInt(),
random.nextInt());
@@ -677,13 +678,19 @@ public class TestHistoryEventTimelineConversion {
((Integer) timelineEntity.getOtherInfo().get("BAR")).intValue());
}
+ @SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testConvertVertexInitializedEvent() {
long initRequestedTime = random.nextLong();
long initedTime = random.nextLong();
int numTasks = random.nextInt();
VertexInitializedEvent event = new VertexInitializedEvent(tezVertexID,
"v1", initRequestedTime,
- initedTime, numTasks, "proc", null, null);
+ initedTime, numTasks, "proc", null, null,
+ new ServicePluginInfo().setContainerLauncherName("abc")
+ .setTaskSchedulerName("def").setTaskCommunicatorName("ghi")
+ .setContainerLauncherClassName("abc1")
+ .setTaskSchedulerClassName("def1")
+ .setTaskCommunicatorClassName("ghi1"));
TimelineEntity timelineEntity =
HistoryEventTimelineConversion.convertToTimelineEntity(event);
Assert.assertEquals(EntityTypes.TEZ_VERTEX_ID.name(),
timelineEntity.getEntityType());
@@ -720,6 +727,25 @@ public class TestHistoryEventTimelineConversion {
((Long)
timelineEntity.getOtherInfo().get(ATSConstants.INIT_TIME)).longValue());
Assert.assertEquals(numTasks,
((Integer)
timelineEntity.getOtherInfo().get(ATSConstants.NUM_TASKS)).intValue());
+
Assert.assertNotNull(timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN));
+ Assert.assertEquals("abc",
+ ((Map<String,
Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
+ ATSConstants.CONTAINER_LAUNCHER_NAME));
+ Assert.assertEquals("def",
+ ((Map<String,
Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
+ ATSConstants.TASK_SCHEDULER_NAME));
+ Assert.assertEquals("ghi",
+ ((Map<String,
Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
+ ATSConstants.TASK_COMMUNICATOR_NAME));
+ Assert.assertEquals("abc1",
+ ((Map<String,
Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
+ ATSConstants.CONTAINER_LAUNCHER_CLASS_NAME));
+ Assert.assertEquals("def1",
+ ((Map<String,
Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
+ ATSConstants.TASK_SCHEDULER_CLASS_NAME));
+ Assert.assertEquals("ghi1",
+ ((Map<String,
Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
+ ATSConstants.TASK_COMMUNICATOR_CLASS_NAME));
}
@Test(timeout = 5000)
@@ -756,6 +782,7 @@ public class TestHistoryEventTimelineConversion {
timelineEntity.getOtherInfo().get(ATSConstants.STATUS));
}
+ @SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testConvertVertexFinishedEvent() {
long initRequestedTime = random.nextLong();
@@ -770,7 +797,12 @@ public class TestHistoryEventTimelineConversion {
VertexFinishedEvent event = new VertexFinishedEvent(tezVertexID, "v1", 1,
initRequestedTime,
initedTime, startRequestedTime, startTime, finishTime,
VertexState.ERROR,
- "diagnostics", null, vertexStats, taskStats);
+ "diagnostics", null, vertexStats, taskStats,
+ new ServicePluginInfo().setContainerLauncherName("abc")
+ .setTaskSchedulerName("def").setTaskCommunicatorName("ghi")
+ .setContainerLauncherClassName("abc1")
+ .setTaskSchedulerClassName("def1")
+ .setTaskCommunicatorClassName("ghi1"));
TimelineEntity timelineEntity =
HistoryEventTimelineConversion.convertToTimelineEntity(event);
Assert.assertEquals(EntityTypes.TEZ_VERTEX_ID.name(),
timelineEntity.getEntityType());
@@ -801,6 +833,25 @@ public class TestHistoryEventTimelineConversion {
timelineEntity.getOtherInfo().get(ATSConstants.STATUS));
Assert.assertEquals("diagnostics",
timelineEntity.getOtherInfo().get(ATSConstants.DIAGNOSTICS));
+
Assert.assertNotNull(timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN));
+ Assert.assertEquals("abc",
+ ((Map<String,
Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
+ ATSConstants.CONTAINER_LAUNCHER_NAME));
+ Assert.assertEquals("def",
+ ((Map<String,
Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
+ ATSConstants.TASK_SCHEDULER_NAME));
+ Assert.assertEquals("ghi",
+ ((Map<String,
Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
+ ATSConstants.TASK_COMMUNICATOR_NAME));
+ Assert.assertEquals("abc1",
+ ((Map<String,
Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
+ ATSConstants.CONTAINER_LAUNCHER_CLASS_NAME));
+ Assert.assertEquals("def1",
+ ((Map<String,
Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
+ ATSConstants.TASK_SCHEDULER_CLASS_NAME));
+ Assert.assertEquals("ghi1",
+ ((Map<String,
Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
+ ATSConstants.TASK_COMMUNICATOR_CLASS_NAME));
Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.STATS));
http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java
b/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java
index 3f669c6..b9229e2 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java
@@ -165,13 +165,13 @@ public class TestRecovery {
ApplicationAttemptId.newInstance(appId, 1), null)),
new SimpleShutdownCondition(TIMING.POST,
new VertexInitializedEvent(vertexId0, "Tokenizer", 0L, 0L, 0,
- "", null, initGeneratedEvents)),
+ "", null, initGeneratedEvents, null)),
new SimpleShutdownCondition(TIMING.POST,
new VertexInitializedEvent(vertexId1, "Summation", 0L, 0L, 0,
- "", null, null)),
+ "", null, null, null)),
new SimpleShutdownCondition(TIMING.POST,
new VertexInitializedEvent(vertexId2, "Sorter", 0L, 0L, 0, "",
- null, null)),
+ null, null, null)),
new SimpleShutdownCondition(TIMING.POST,
new VertexConfigurationDoneEvent(vertexId0, 0L, 2, null, null,
@@ -194,15 +194,15 @@ public class TestRecovery {
new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent(
vertexId0, "vertexName", 1, 0L, 0L, 0L, 0L, 0L,
VertexState.SUCCEEDED, "", new TezCounters(),
- new VertexStats(), new HashMap<String, Integer>())),
+ new VertexStats(), new HashMap<String, Integer>(), null)),
new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent(
vertexId1, "vertexName", 1, 0L, 0L, 0L, 0L, 0L,
VertexState.SUCCEEDED, "", new TezCounters(),
- new VertexStats(), new HashMap<String, Integer>())),
+ new VertexStats(), new HashMap<String, Integer>(), null)),
new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent(
vertexId2, "vertexName", 1, 0L, 0L, 0L, 0L, 0L,
VertexState.SUCCEEDED, "", new TezCounters(),
- new VertexStats(), new HashMap<String, Integer>())),
+ new VertexStats(), new HashMap<String, Integer>(), null)),
new SimpleShutdownCondition(TIMING.POST, new TaskStartedEvent(
TezTaskID.getInstance(vertexId0, 0), "vertexName", 0L, 0L)),
@@ -376,11 +376,11 @@ public class TestRecovery {
"dagName", new HashMap<String, Integer>(), ApplicationAttemptId
.newInstance(appId, 1), null)),
new SimpleShutdownCondition(TIMING.POST, new VertexInitializedEvent(
- vertexId0, "hashSide", 0L, 0L, 0, "", null, initGeneratedEvents)),
+ vertexId0, "hashSide", 0L, 0L, 0, "", null, initGeneratedEvents,
null)),
new SimpleShutdownCondition(TIMING.POST, new VertexInitializedEvent(
- vertexId1, "streamingSide", 0L, 0L, 0, "", null, null)),
+ vertexId1, "streamingSide", 0L, 0L, 0, "", null, null, null)),
new SimpleShutdownCondition(TIMING.POST, new VertexInitializedEvent(
- vertexId2, "joiner", 0L, 0L, 0, "", null, null)),
+ vertexId2, "joiner", 0L, 0L, 0, "", null, null, null)),
new SimpleShutdownCondition(TIMING.POST, new VertexStartedEvent(
vertexId0, 0L, 0L)),
@@ -404,15 +404,15 @@ public class TestRecovery {
new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent(
vertexId0, "vertexName", 1, 0L, 0L, 0L, 0L, 0L,
VertexState.SUCCEEDED, "", new TezCounters(), new VertexStats(),
- new HashMap<String, Integer>())),
+ new HashMap<String, Integer>(), null)),
new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent(
vertexId1, "vertexName", 1, 0L, 0L, 0L, 0L, 0L,
VertexState.SUCCEEDED, "", new TezCounters(), new VertexStats(),
- new HashMap<String, Integer>())),
+ new HashMap<String, Integer>(), null)),
new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent(
vertexId2, "vertexName", 1, 0L, 0L, 0L, 0L, 0L,
VertexState.SUCCEEDED, "", new TezCounters(), new VertexStats(),
- new HashMap<String, Integer>())),
+ new HashMap<String, Integer>(), null)),
new SimpleShutdownCondition(TIMING.POST, new TaskStartedEvent(TezTaskID
.getInstance(vertexId0, 0), "vertexName", 0L, 0L)),
@@ -574,7 +574,7 @@ public class TestRecovery {
0L, "username", "dagName")),
new SimpleShutdownCondition(TIMING.POST,
new VertexInitializedEvent(vertexId0, "Tokenizer", 0L, 0L,
0,
- "", null, initGeneratedEvents)),
+ "", null, initGeneratedEvents, null)),
new SimpleShutdownCondition(TIMING.POST, new VertexStartedEvent(
vertexId0, 0L, 0L)),
new SimpleShutdownCondition(TIMING.POST,
@@ -592,15 +592,15 @@ public class TestRecovery {
new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent(
vertexId0, "vertexName", 1, 0L, 0L, 0L, 0L, 0L,
VertexState.SUCCEEDED, "", new TezCounters(),
- new VertexStats(), new HashMap<String, Integer>())),
+ new VertexStats(), new HashMap<String, Integer>(), null)),
new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent(
vertexId1, "vertexName", 1, 0L, 0L, 0L, 0L, 0L,
VertexState.SUCCEEDED, "", new TezCounters(),
- new VertexStats(), new HashMap<String, Integer>())),
+ new VertexStats(), new HashMap<String, Integer>(), null)),
new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent(
vertexId2, "vertexName", 1, 0L, 0L, 0L, 0L, 0L,
VertexState.SUCCEEDED, "", new TezCounters(),
- new VertexStats(), new HashMap<String, Integer>())),
+ new VertexStats(), new HashMap<String, Integer>(), null)),
new SimpleShutdownCondition(TIMING.POST, new DAGFinishedEvent(
dagId, 0L, 0L, DAGState.SUCCEEDED, "", new TezCounters(),
"username", "dagName", new HashMap<String, Integer>(),