Repository: tez Updated Branches: refs/heads/master 515b92b01 -> 7e9ed8318
TEZ-3219. Allow service plugins to define log locations link for remotely run task attempts. (hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/7e9ed831 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/7e9ed831 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/7e9ed831 Branch: refs/heads/master Commit: 7e9ed83184e1fefb4ecf8b560a60d2521c0f30e0 Parents: 515b92b Author: Hitesh Shah <[email protected]> Authored: Mon Apr 25 11:04:41 2016 -0700 Committer: Hitesh Shah <[email protected]> Committed: Mon Apr 25 11:04:41 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../org/apache/tez/common/ATSConstants.java | 7 ++ tez-dag/findbugs-exclude.xml | 2 + .../java/org/apache/tez/dag/app/AppContext.java | 6 ++ .../org/apache/tez/dag/app/DAGAppMaster.java | 21 +++++ .../tez/dag/app/TaskCommunicatorManager.java | 30 ++++++ .../app/TaskCommunicatorManagerInterface.java | 8 ++ .../tez/dag/app/TaskCommunicatorWrapper.java | 12 +++ .../java/org/apache/tez/dag/app/dag/Vertex.java | 3 + .../tez/dag/app/dag/impl/ServicePluginInfo.java | 99 ++++++++++++++++++++ .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 55 +++++++---- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 30 +++++- .../app/launcher/ContainerLauncherManager.java | 4 + .../tez/dag/app/rm/TaskSchedulerManager.java | 5 + .../dag/history/events/VertexFinishedEvent.java | 15 ++- .../history/events/VertexInitializedEvent.java | 14 ++- .../impl/HistoryEventJsonConversion.java | 8 ++ .../apache/tez/dag/history/utils/DAGUtils.java | 29 ++++++ .../serviceplugins/api/TaskCommunicator.java | 22 +++++ .../apache/tez/dag/app/TestRecoveryParser.java | 15 +-- .../tez/dag/app/dag/impl/TestDAGImpl.java | 5 +- .../tez/dag/app/dag/impl/TestDAGRecovery.java | 12 +-- .../tez/dag/app/dag/impl/TestTaskAttempt.java | 13 ++- .../tez/dag/app/dag/impl/TestVertexImpl.java | 4 + .../TestHistoryEventsProtoConversion.java | 6 +- .../impl/TestHistoryEventJsonConversion.java | 4 +- .../ats/HistoryEventTimelineConversion.java | 16 +++- .../ats/TestHistoryEventTimelineConversion.java | 59 +++++++++++- .../java/org/apache/tez/test/TestRecovery.java | 32 +++---- 29 files changed, 469 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/7e9ed831/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c471c87..f9f295f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Release 0.9.0: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3219. Allow service plugins to define log locations link for remotely run task attempts. TEZ-3224. User payload is not initialized before creating vertex manager plugin. TEZ-3226. Tez UI 2: All DAGs UX improvements. TEZ-3077. TezClient.waitTillReady should support timeout. @@ -21,6 +22,7 @@ Release 0.8.4: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3219. Allow service plugins to define log locations link for remotely run task attempts. TEZ-3224. User payload is not initialized before creating vertex manager plugin. TEZ-3226. Tez UI 2: All DAGs UX improvements. TEZ-3077. TezClient.waitTillReady should support timeout. http://git-wip-us.apache.org/repos/asf/tez/blob/7e9ed831/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java index 0b8c67d..c56582c 100644 --- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java +++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java @@ -90,6 +90,13 @@ public class ATSConstants { public static final String LAST_DATA_EVENTS = "lastDataEvents"; public static final String UPDATED_EDGE_MANAGERS = "updatedEdgeManagers"; public static final String CREATION_CAUSAL_ATTEMPT = "creationCausalAttempt"; + public static final String TASK_COMMUNICATOR_NAME = "taskCommunicatorName"; + public static final String TASK_SCHEDULER_NAME = "taskSchedulerName"; + public static final String CONTAINER_LAUNCHER_NAME = "containerLauncherName"; + public static final String TASK_COMMUNICATOR_CLASS_NAME = "taskCommunicatorClassName"; + public static final String TASK_SCHEDULER_CLASS_NAME = "taskSchedulerClassName"; + public static final String CONTAINER_LAUNCHER_CLASS_NAME = "containerLauncherClassName"; + public static final String SERVICE_PLUGIN = "servicePlugin"; /* Counters-related keys */ public static final String COUNTER_GROUPS = "counterGroups"; http://git-wip-us.apache.org/repos/asf/tez/blob/7e9ed831/tez-dag/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/tez-dag/findbugs-exclude.xml b/tez-dag/findbugs-exclude.xml index f106c8b..0f3cdca 100644 --- a/tez-dag/findbugs-exclude.xml +++ b/tez-dag/findbugs-exclude.xml @@ -163,6 +163,8 @@ <Field name="recoveryEnabled"/> <Field name="isLocal"/> <Field name="hadoopShim"/> + <Field name="containerLauncherManager"/> + <Field name="taskCommunicatorManager"/> </Or> <Bug pattern="IS2_INCONSISTENT_SYNC"/> </Match> http://git-wip-us.apache.org/repos/asf/tez/blob/7e9ed831/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java index 30716da..45ce8c1 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java @@ -94,6 +94,8 @@ public interface AppContext { TaskSchedulerManager getTaskScheduler(); + TaskCommunicatorManagerInterface getTaskCommunicatorManager(); + boolean isSession(); boolean isLocal(); @@ -127,6 +129,10 @@ public interface AppContext { public String getTaskSchedulerName(int schedulerId); public String getContainerLauncherName(int launcherId); + public String getTaskCommunicatorClassName(int taskCommId); + public String getTaskSchedulerClassName(int schedulerId); + public String getContainerLauncherClassName(int launcherId); + public HadoopShim getHadoopShim(); public DAGRecoveryData getDAGRecoveryData(); http://git-wip-us.apache.org/repos/asf/tez/blob/7e9ed831/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 7d28497..7ca7118 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -1589,6 +1589,11 @@ public class DAGAppMaster extends AbstractService { } @Override + public TaskCommunicatorManagerInterface getTaskCommunicatorManager() { + return taskCommunicatorManager; + } + + @Override public boolean isSession() { return isSession; } @@ -1680,6 +1685,22 @@ public class DAGAppMaster extends AbstractService { } @Override + public String getTaskCommunicatorClassName(int taskCommId) { + return taskCommunicatorManager.getTaskCommunicatorClassName(taskCommId); + } + + @Override + public String getTaskSchedulerClassName(int schedulerId) { + return taskSchedulerManager.getTaskSchedulerClassName(schedulerId); + } + + @Override + public String getContainerLauncherClassName(int launcherId) { + return containerLauncherManager.getContainerLauncherClassName(launcherId); + } + + + @Override public HadoopShim getHadoopShim() { return hadoopShim; } http://git-wip-us.apache.org/repos/asf/tez/blob/7e9ed831/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java index cfb177b..c9d1f2e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java @@ -650,4 +650,34 @@ public class TaskCommunicatorManager extends AbstractService implements private void sendEvent(Event<?> event) { context.getEventHandler().handle(event); } + + @Override + public String getTaskCommunicatorClassName(int taskCommId) { + return taskCommunicators[taskCommId].getTaskCommunicator().getClass().getName(); + } + + @Override + public String getInProgressLogsUrl(int taskCommId, TezTaskAttemptID attemptID, NodeId containerNodeId) { + try { + return taskCommunicators[taskCommId].getInProgressLogsUrl(attemptID, containerNodeId); + } catch (Exception e) { + LOG.warn("Failed to retrieve InProgressLogsUrl from TaskCommunicator," + + ", communicator=" + Utils.getTaskCommIdentifierString(taskCommId, context) + + ", attemptId=" + attemptID, e); + } + return null; + } + + @Override + public String getCompletedLogsUrl(int taskCommId, TezTaskAttemptID attemptID, NodeId containerNodeId) { + try { + return taskCommunicators[taskCommId].getCompletedLogsUrl(attemptID, containerNodeId); + } catch (Exception e) { + LOG.warn("Failed to retrieve CompletedLogsUrl from TaskCommunicator," + + ", communicator=" + Utils.getTaskCommIdentifierString(taskCommId, context) + + ", attemptId=" + attemptID, e); + } + return null; + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/7e9ed831/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java index e0f9852..254e74c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java @@ -19,6 +19,7 @@ package org.apache.tez.dag.app; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.tez.serviceplugins.api.ContainerEndReason; import org.apache.tez.serviceplugins.api.DagInfo; import org.apache.tez.serviceplugins.api.ServicePluginError; @@ -46,4 +47,11 @@ public interface TaskCommunicatorManagerInterface { TaskCommunicatorWrapper getTaskCommunicator(int taskCommIndex); void reportError(int taskCommIndex, ServicePluginError servicePluginError, String diagnostics, DagInfo dagName); + + String getTaskCommunicatorClassName(int taskCommId); + + String getInProgressLogsUrl(int taskCommId, TezTaskAttemptID attemptID, NodeId containerNodeId); + + String getCompletedLogsUrl(int taskCommId, TezTaskAttemptID attemptID, NodeId containerNodeId); + } http://git-wip-us.apache.org/repos/asf/tez/blob/7e9ed831/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java index 4a75875..4afb427 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java @@ -21,6 +21,7 @@ import java.util.Map; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.tez.serviceplugins.api.TaskCommunicator; import org.apache.tez.dag.api.event.VertexStateUpdate; import org.apache.tez.dag.records.TezTaskAttemptID; @@ -80,4 +81,15 @@ public class TaskCommunicatorWrapper { public TaskCommunicator getTaskCommunicator() { return real; } + + public String getInProgressLogsUrl(TezTaskAttemptID attemptID, NodeId containerNodeId) + throws Exception { + return real.getInProgressLogsUrl(attemptID, containerNodeId); + } + + public String getCompletedLogsUrl(TezTaskAttemptID attemptID, NodeId containerNodeId) + throws Exception { + return real.getCompletedLogsUrl(attemptID, containerNodeId); + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/7e9ed831/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java index 25fbf3a..1b3b39c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java @@ -49,6 +49,7 @@ import org.apache.tez.dag.app.TaskAttemptEventInfo; import org.apache.tez.dag.app.dag.event.SpeculatorEvent; import org.apache.tez.dag.app.dag.impl.AMUserCodeException; import org.apache.tez.dag.app.dag.impl.Edge; +import org.apache.tez.dag.app.dag.impl.ServicePluginInfo; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; @@ -190,6 +191,8 @@ public interface Vertex extends Comparable<Vertex> { public int getContainerLauncherIdentifier(); public int getTaskCommunicatorIdentifier(); + public ServicePluginInfo getServicePluginInfo(); + public long getInitTime(); public long getStartTime(); public long getFinishTime(); http://git-wip-us.apache.org/repos/asf/tez/blob/7e9ed831/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ServicePluginInfo.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ServicePluginInfo.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ServicePluginInfo.java new file mode 100644 index 0000000..2ff1ee5 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ServicePluginInfo.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.dag.impl; + + +public class ServicePluginInfo { + + private String containerLauncherName; + private String taskSchedulerName; + private String taskCommunicatorName; + private String containerLauncherClassName; + private String taskSchedulerClassName; + private String taskCommunicatorClassName; + + public ServicePluginInfo() { + } + + public String getContainerLauncherName() { + return containerLauncherName; + } + + public ServicePluginInfo setContainerLauncherName(String containerLauncherName) { + this.containerLauncherName = containerLauncherName; + return this; + } + + public String getTaskSchedulerName() { + return taskSchedulerName; + } + + public ServicePluginInfo setTaskSchedulerName(String taskSchedulerName) { + this.taskSchedulerName = taskSchedulerName; + return this; + } + + public String getTaskCommunicatorName() { + return taskCommunicatorName; + } + + public ServicePluginInfo setTaskCommunicatorName(String taskCommunicatorName) { + this.taskCommunicatorName = taskCommunicatorName; + return this; + } + + public String getContainerLauncherClassName() { + return containerLauncherClassName; + } + + public ServicePluginInfo setContainerLauncherClassName(String containerLauncherClassName) { + this.containerLauncherClassName = containerLauncherClassName; + return this; + } + + public String getTaskSchedulerClassName() { + return taskSchedulerClassName; + } + + public ServicePluginInfo setTaskSchedulerClassName(String taskSchedulerClassName) { + this.taskSchedulerClassName = taskSchedulerClassName; + return this; + } + + public String getTaskCommunicatorClassName() { + return taskCommunicatorClassName; + } + + public ServicePluginInfo setTaskCommunicatorClassName(String taskCommunicatorClassName) { + this.taskCommunicatorClassName = taskCommunicatorClassName; + return this; + } + + @Override + public String toString() { + return "ServicePluginInfo {" + + "containerLauncherName=" + containerLauncherName + + ", taskSchedulerName=" + taskSchedulerName + + ", taskCommunicatorName=" + taskCommunicatorName + + ", containerLauncherClassName=" + containerLauncherClassName + + ", taskSchedulerClassName=" + taskSchedulerClassName + + ", taskCommunicatorClassName=" + taskCommunicatorClassName + + " }"; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/7e9ed831/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index 6169a7b..6d9247e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -31,6 +31,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.app.dag.event.TaskEvent; import org.apache.tez.dag.app.dag.event.TaskEventTAFailed; import org.apache.tez.dag.app.dag.event.TaskEventTAKilled; @@ -1107,31 +1108,49 @@ public class TaskAttemptImpl implements TaskAttempt, private String getInProgressLogsUrl() { String inProgressLogsUrl = null; - if (containerId != null && nodeHttpAddress != null) { - final String containerIdStr = containerId.toString(); - inProgressLogsUrl = nodeHttpAddress - + "/" + "node/containerlogs" - + "/" + containerIdStr - + "/" + this.appContext.getUser(); + if (getVertex().getServicePluginInfo().getContainerLauncherName().equals( + TezConstants.getTezYarnServicePluginName()) + || getVertex().getServicePluginInfo().getContainerLauncherName().equals( + TezConstants.getTezUberServicePluginName())) { + if (containerId != null && nodeHttpAddress != null) { + final String containerIdStr = containerId.toString(); + inProgressLogsUrl = nodeHttpAddress + + "/" + "node/containerlogs" + + "/" + containerIdStr + + "/" + this.appContext.getUser(); + } + } else { + inProgressLogsUrl = appContext.getTaskCommunicatorManager().getInProgressLogsUrl( + getVertex().getTaskCommunicatorIdentifier(), + attemptId, containerNodeId); } return inProgressLogsUrl; } private String getCompletedLogsUrl() { String completedLogsUrl = null; - if (containerId != null && containerNodeId != null && nodeHttpAddress != null) { - final String containerIdStr = containerId.toString(); - if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, - YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED) - && conf.get(YarnConfiguration.YARN_LOG_SERVER_URL) != null) { - String contextStr = "v_" + getVertex().getName() - + "_" + this.attemptId.toString(); - completedLogsUrl = conf.get(YarnConfiguration.YARN_LOG_SERVER_URL) - + "/" + containerNodeId.toString() - + "/" + containerIdStr - + "/" + contextStr - + "/" + this.appContext.getUser(); + if (getVertex().getServicePluginInfo().getContainerLauncherName().equals( + TezConstants.getTezYarnServicePluginName()) + || getVertex().getServicePluginInfo().getContainerLauncherName().equals( + TezConstants.getTezUberServicePluginName())) { + if (containerId != null && containerNodeId != null && nodeHttpAddress != null) { + final String containerIdStr = containerId.toString(); + if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, + YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED) + && conf.get(YarnConfiguration.YARN_LOG_SERVER_URL) != null) { + String contextStr = "v_" + getVertex().getName() + + "_" + this.attemptId.toString(); + completedLogsUrl = conf.get(YarnConfiguration.YARN_LOG_SERVER_URL) + + "/" + containerNodeId.toString() + + "/" + containerIdStr + + "/" + contextStr + + "/" + this.appContext.getUser(); + } } + } else { + completedLogsUrl = appContext.getTaskCommunicatorManager().getCompletedLogsUrl( + getVertex().getTaskCommunicatorIdentifier(), + attemptId, containerNodeId); } return completedLogsUrl; } http://git-wip-us.apache.org/repos/asf/tez/blob/7e9ed831/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index ea202f7..b22af1a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -247,6 +247,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl @VisibleForTesting final int taskCommunicatorIdentifier; + final ServicePluginInfo servicePluginInfo; + + //fields initialized in init @VisibleForTesting @@ -1009,6 +1012,17 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl LOG.error("Failed to get index for containerLauncher: " + containerLauncherName); throw e; } + this.servicePluginInfo = new ServicePluginInfo() + .setContainerLauncherName( + appContext.getContainerLauncherName(this.containerLauncherIdentifier)) + .setTaskSchedulerName(appContext.getTaskSchedulerName(this.taskSchedulerIdentifier)) + .setTaskCommunicatorName(appContext.getTaskCommunicatorName(this.taskCommunicatorIdentifier)) + .setContainerLauncherClassName( + appContext.getContainerLauncherClassName(this.containerLauncherIdentifier)) + .setTaskSchedulerClassName( + appContext.getTaskSchedulerClassName(this.taskSchedulerIdentifier)) + .setTaskCommunicatorClassName( + appContext.getTaskCommunicatorClassName(this.taskCommunicatorIdentifier)); StringBuilder sb = new StringBuilder(); sb.append("Running vertex: ").append(logIdentifier).append(" : ") @@ -1043,6 +1057,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl } @Override + public ServicePluginInfo getServicePluginInfo() { + return servicePluginInfo; + } + + @Override public boolean isSpeculationEnabled() { return isSpeculationEnabled; } @@ -1923,8 +1942,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl void logJobHistoryVertexInitializedEvent() { if (recoveryData == null || !recoveryData.shouldSkipInit()) { VertexInitializedEvent initEvt = new VertexInitializedEvent(vertexId, vertexName, - initTimeRequested, initedTime, numTasks, - getProcessorName(), getAdditionalInputs(), initGeneratedEvents); + initTimeRequested, initedTime, numTasks, + getProcessorName(), getAdditionalInputs(), initGeneratedEvents, + servicePluginInfo); this.appContext.getHistoryHandler().handle( new DAGHistoryEvent(getDAGId(), initEvt)); } @@ -1989,9 +2009,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl taskStats.put(ATSConstants.NUM_FAILED_TASKS_ATTEMPTS, failedTaskAttemptCount.get()); taskStats.put(ATSConstants.NUM_KILLED_TASKS_ATTEMPTS, killedTaskAttemptCount.get()); - VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId, vertexName, numTasks, initTimeRequested, - initedTime, startTimeRequested, startedTime, finishTime, finalState, diagnostics, - counters, getVertexStats(), taskStats); + VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId, vertexName, numTasks, + initTimeRequested, initedTime, startTimeRequested, startedTime, finishTime, finalState, + diagnostics, counters, getVertexStats(), taskStats, servicePluginInfo); this.appContext.getHistoryHandler().handleCriticalEvent( new DAGHistoryEvent(getDAGId(), finishEvt)); } http://git-wip-us.apache.org/repos/asf/tez/blob/7e9ed831/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java index 1f5151b..f2c1cff 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java @@ -279,4 +279,8 @@ public class ContainerLauncherManager extends AbstractService private void sendEvent(Event<?> event) { appContext.getEventHandler().handle(event); } + + public String getContainerLauncherClassName(int containerLauncherIndex) { + return containerLaunchers[containerLauncherIndex].getContainerLauncher().getClass().getName(); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/7e9ed831/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java index 16f9a28..e68c9b8 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java @@ -996,4 +996,9 @@ public class TaskSchedulerManager extends AbstractService implements return historyUrl; } + + public String getTaskSchedulerClassName(int taskSchedulerIndex) { + return taskSchedulers[taskSchedulerIndex].getTaskScheduler().getClass().getName(); + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/7e9ed831/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java index f914165..a2cdae2 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java @@ -23,6 +23,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.util.Map; +import org.apache.tez.dag.app.dag.impl.ServicePluginInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.tez.common.counters.TezCounters; @@ -55,12 +56,14 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent { private boolean fromSummary = false; private VertexStats vertexStats; private Map<String, Integer> vertexTaskStats; + private ServicePluginInfo servicePluginInfo; public VertexFinishedEvent(TezVertexID vertexId, String vertexName, int numTasks, long initRequestedTime, long initedTime, long startRequestedTime, long startedTime, long finishTime, VertexState state, String diagnostics, TezCounters counters, VertexStats vertexStats, - Map<String, Integer> vertexTaskStats) { + Map<String, Integer> vertexTaskStats, + ServicePluginInfo servicePluginInfo) { this.vertexName = vertexName; this.vertexID = vertexId; this.numTasks = numTasks; @@ -74,6 +77,7 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent { this.tezCounters = counters; this.vertexStats = vertexStats; this.vertexTaskStats = vertexTaskStats; + this.servicePluginInfo = servicePluginInfo; } public VertexFinishedEvent() { @@ -147,7 +151,9 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent { + ", counters=" + ( tezCounters == null ? "null" : tezCounters.toString().replaceAll("\\n", ", ").replaceAll("\\s+", " ")) + ", vertexStats=" + (vertexStats == null ? "null" : vertexStats.toString()) - + ", vertexTaskStats=" + (vertexTaskStats == null ? "null" : vertexTaskStats.toString()); + + ", vertexTaskStats=" + (vertexTaskStats == null ? "null" : vertexTaskStats.toString()) + + ", servicePluginInfo=" + + (servicePluginInfo != null ? servicePluginInfo : "null"); } public TezVertexID getVertexID() { @@ -227,4 +233,9 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent { public boolean isFromSummary() { return fromSummary; } + + public ServicePluginInfo getServicePluginInfo() { + return servicePluginInfo; + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/7e9ed831/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java index 052908b..90099fc 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java @@ -30,6 +30,7 @@ import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.InputInitializerDescriptor; import org.apache.tez.dag.api.RootInputLeafOutput; import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto; +import org.apache.tez.dag.app.dag.impl.ServicePluginInfo; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; import org.apache.tez.dag.history.utils.TezEventUtils; @@ -51,6 +52,7 @@ public class VertexInitializedEvent implements HistoryEvent { private String processorName; private Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> additionalInputs; private List<TezEvent> initGeneratedEvents; + private ServicePluginInfo servicePluginInfo; public VertexInitializedEvent() { } @@ -59,7 +61,7 @@ public class VertexInitializedEvent implements HistoryEvent { String vertexName, long initRequestedTime, long initedTime, int numTasks, String processorName, Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> additionalInputs, - List<TezEvent> initGeneratedEvents) { + List<TezEvent> initGeneratedEvents, ServicePluginInfo servicePluginInfo) { this.vertexName = vertexName; this.vertexID = vertexId; this.initRequestedTime = initRequestedTime; @@ -68,6 +70,7 @@ public class VertexInitializedEvent implements HistoryEvent { this.processorName = processorName; this.additionalInputs = additionalInputs; this.initGeneratedEvents = initGeneratedEvents; + this.servicePluginInfo = servicePluginInfo; } @Override @@ -173,7 +176,9 @@ public class VertexInitializedEvent implements HistoryEvent { + ", additionalInputsCount=" + (additionalInputs != null ? additionalInputs.size() : 0) + ", initGeneratedEventsCount=" - + (initGeneratedEvents != null ? initGeneratedEvents.size() : 0); + + (initGeneratedEvents != null ? initGeneratedEvents.size() : 0) + + ", servicePluginInfo=" + + (servicePluginInfo != null ? servicePluginInfo : "null"); } public TezVertexID getVertexID() { @@ -208,4 +213,9 @@ public class VertexInitializedEvent implements HistoryEvent { public List<TezEvent> getInitGeneratedEvents() { return initGeneratedEvents; } + + public ServicePluginInfo getServicePluginInfo() { + return servicePluginInfo; + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/7e9ed831/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java index 75116c8..a767fbf 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java @@ -716,6 +716,10 @@ public class HistoryEventJsonConversion { otherInfo.put(entry.getKey(), entry.getValue().intValue()); } } + if (event.getServicePluginInfo() != null) { + otherInfo.put(ATSConstants.SERVICE_PLUGIN, + DAGUtils.convertServicePluginToJSON(event.getServicePluginInfo())); + } jsonObject.put(ATSConstants.OTHER_INFO, otherInfo); @@ -787,6 +791,10 @@ public class HistoryEventJsonConversion { otherInfo.put(ATSConstants.INIT_TIME, event.getInitedTime()); otherInfo.put(ATSConstants.NUM_TASKS, event.getNumTasks()); otherInfo.put(ATSConstants.PROCESSOR_CLASS_NAME, event.getProcessorName()); + if (event.getServicePluginInfo() != null) { + otherInfo.put(ATSConstants.SERVICE_PLUGIN, + DAGUtils.convertServicePluginToJSON(event.getServicePluginInfo())); + } jsonObject.put(ATSConstants.OTHER_INFO, otherInfo); return jsonObject; http://git-wip-us.apache.org/repos/asf/tez/blob/7e9ed831/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java index 781120c..f1ac0ab 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java @@ -42,6 +42,7 @@ import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.records.DAGProtos; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.apache.tez.dag.api.records.DAGProtos.PlanGroupInputEdgeInfo; +import org.apache.tez.dag.app.dag.impl.ServicePluginInfo; import org.apache.tez.dag.app.dag.impl.VertexStats; import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl.DataEventDependencyInfo; import org.apache.tez.dag.history.logging.EntityTypes; @@ -405,6 +406,34 @@ public class DAGUtils { return vertexStatsMap; } + public static JSONObject convertServicePluginToJSON( + ServicePluginInfo servicePluginInfo) { + JSONObject jsonObject = new JSONObject(convertServicePluginToATSMap(servicePluginInfo)); + return jsonObject; + } + + public static Map<String,Object> convertServicePluginToATSMap( + ServicePluginInfo servicePluginInfo) { + Map<String, Object> servicePluginMap = new LinkedHashMap<String, Object>(); + if (servicePluginInfo == null) { + return servicePluginMap; + } + servicePluginMap.put(ATSConstants.TASK_SCHEDULER_NAME, + servicePluginInfo.getTaskSchedulerName()); + servicePluginMap.put(ATSConstants.TASK_SCHEDULER_CLASS_NAME, + servicePluginInfo.getTaskSchedulerClassName()); + servicePluginMap.put(ATSConstants.TASK_COMMUNICATOR_NAME, + servicePluginInfo.getTaskCommunicatorName()); + servicePluginMap.put(ATSConstants.TASK_COMMUNICATOR_CLASS_NAME, + servicePluginInfo.getTaskCommunicatorClassName()); + servicePluginMap.put(ATSConstants.CONTAINER_LAUNCHER_NAME, + servicePluginInfo.getContainerLauncherName()); + servicePluginMap.put(ATSConstants.CONTAINER_LAUNCHER_CLASS_NAME, + servicePluginInfo.getContainerLauncherClassName()); + return servicePluginMap; + } + + public static Map<String,Object> convertEdgeProperty( EdgeProperty edge) { Map<String, Object> jsonDescriptor = new HashMap<String, Object>(); http://git-wip-us.apache.org/repos/asf/tez/blob/7e9ed831/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicator.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicator.java index 8f919d1..34debd4 100644 --- a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicator.java +++ b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicator.java @@ -35,6 +35,7 @@ import java.util.Map; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.tez.common.ServicePluginLifecycle; import org.apache.tez.dag.api.event.VertexStateUpdate; import org.apache.tez.dag.records.TezTaskAttemptID; @@ -229,4 +230,25 @@ public abstract class TaskCommunicator implements ServicePluginLifecycle { * This will cause the app to shutdown. */ public abstract Object getMetaInfo() throws ServicePluginException; + + /** + * Return a URL that can be used as a link to the logs for a running attempt. + * @param attemptID Attempt ID for which the log link should be provided + * @param containerNodeId Node Id on which the attempt is meant to have run + * @return URL to logs for the attempt + */ + public String getInProgressLogsUrl(TezTaskAttemptID attemptID, NodeId containerNodeId) { + return null; + } + + /** + * Return a URL that can be used as a link to the logs for a completed attempt. + * @param attemptID Attempt ID for which the log link should be provided + * @param containerNodeId Node Id on which the attempt is meant to have run + * @return URL to logs for the attempt + */ + public String getCompletedLogsUrl(TezTaskAttemptID attemptID, NodeId containerNodeId) { + return null; + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/7e9ed831/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java index 962b230..f4edf9e 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java @@ -453,7 +453,7 @@ public class TestRecoveryParser { rService.handle(new DAGHistoryEvent(dagID, new VertexFinishedEvent(vertexId, "v1", 10, 0L, 0L, 0L, 0L, 0L, VertexState.SUCCEEDED, - "", null, null, null))); + "", null, null, null, null))); rService.stop(); DAGRecoveryData dagData = parser.parseRecoveryData(); @@ -531,11 +531,11 @@ public class TestRecoveryParser { rService.handle(new DAGHistoryEvent(dagID, new VertexFinishedEvent(v0, "v1", 10, 0L, 0L, 0L, 0L, 0L, VertexState.SUCCEEDED, - "", null, null, null))); + "", null, null, null, null))); rService.handle(new DAGHistoryEvent(dagID, new VertexFinishedEvent(v1, "v1", 10, 0L, 0L, 0L, 0L, 0L, VertexState.SUCCEEDED, - "", null, null, null))); + "", null, null, null, null))); rService.stop(); DAGRecoveryData dagData = parser.parseRecoveryData(); @@ -573,7 +573,7 @@ public class TestRecoveryParser { rService.handle(new DAGHistoryEvent(dagID, new VertexFinishedEvent(vertexId, "v1", 10, 0L, 0L, 0L, 0L, 0L, VertexState.SUCCEEDED, - "", null, null, null))); + "", null, null, null, null))); rService.stop(); DAGRecoveryData dagData = parser.parseRecoveryData(); @@ -652,18 +652,19 @@ public class TestRecoveryParser { TezVertexID v1Id = TezVertexID.getInstance(dagID, 1); TezVertexID v2Id = TezVertexID.getInstance(dagID, 2); // v0 VertexInitializedEvent - VertexInitializedEvent v0InitedEvent = new VertexInitializedEvent(v0Id, "v0", 200L, 400L, 2, null, null, null); + VertexInitializedEvent v0InitedEvent = new VertexInitializedEvent( + v0Id, "v0", 200L, 400L, 2, null, null, null, null); rService.handle(new DAGHistoryEvent(dagID, v0InitedEvent)); // v1 VertexFinishedEvent(KILLED) VertexFinishedEvent v1FinishedEvent = new VertexFinishedEvent(v1Id, "v1", 2, 300L, 400L, 500L, 600L, 700L, VertexState.KILLED, - "", null, null, null); + "", null, null, null, null); rService.handle(new DAGHistoryEvent(dagID, v1FinishedEvent)); // v2 VertexInitializedEvent -> VertexStartedEvent List<TezEvent> initGeneratedEvents = Lists.newArrayList( new TezEvent(DataMovementEvent.create(ByteBuffer.wrap(new byte[0])), null)); VertexInitializedEvent v2InitedEvent = new VertexInitializedEvent(v2Id, "v2", 200L, 300L, - 2, null, null, initGeneratedEvents); + 2, null, null, initGeneratedEvents, null); VertexStartedEvent v2StartedEvent = new VertexStartedEvent(v2Id, 0L, 0L); rService.handle(new DAGHistoryEvent(dagID, v2InitedEvent)); rService.handle(new DAGHistoryEvent(dagID, v2StartedEvent)); http://git-wip-us.apache.org/repos/asf/tez/blob/7e9ed831/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java index 480e3cf..4471278 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java @@ -19,9 +19,9 @@ package org.apache.tez.dag.app.dag.impl; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.IOException; @@ -40,6 +40,7 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.lang.StringUtils; import org.apache.tez.common.counters.Limits; import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag; import org.apache.tez.hadoop.shim.DefaultHadoopShim; import org.apache.tez.hadoop.shim.HadoopShim; @@ -986,6 +987,8 @@ public class TestDAGImpl { doReturn(clusterInfo).when(dagWithCustomEdgeAppContext).getClusterInfo(); dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDisptacher2()); dispatcher.register(AMSchedulerEventType.class, new AMSchedulerEventHandler()); + when(dagWithCustomEdgeAppContext.getContainerLauncherName(anyInt())).thenReturn( + TezConstants.getTezYarnServicePluginName()); } private void initDAG(DAGImpl impl) { http://git-wip-us.apache.org/repos/asf/tez/blob/7e9ed831/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java index be1821b..3c284ec 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java @@ -700,7 +700,7 @@ public class TestDAGRecovery { List<TezEvent> inputGeneratedTezEvents = new ArrayList<TezEvent>(); VertexInitializedEvent v1InitedEvent = new VertexInitializedEvent(v1Id, "vertex1", 0L, v1InitedTime, - v1NumTask, "", null, inputGeneratedTezEvents); + v1NumTask, "", null, inputGeneratedTezEvents, null); VertexRecoveryData vertexRecoveryData = new VertexRecoveryData(v1InitedEvent, null, null, null, null, false); doReturn(vertexRecoveryData).when(dagRecoveryData).getVertexRecoveryData(v1Id); @@ -732,7 +732,7 @@ public class TestDAGRecovery { List<TezEvent> inputGeneratedTezEvents = new ArrayList<TezEvent>(); VertexInitializedEvent v1InitedEvent = new VertexInitializedEvent(v1Id, "vertex1", 0L, v1InitedTime, - v1NumTask, "", null, inputGeneratedTezEvents); + v1NumTask, "", null, inputGeneratedTezEvents, null); VertexConfigurationDoneEvent v1ReconfigureDoneEvent = new VertexConfigurationDoneEvent(v1Id, 0L, v1NumTask, null, null, null, true); VertexRecoveryData vertexRecoveryData = new VertexRecoveryData(v1InitedEvent, @@ -772,7 +772,7 @@ public class TestDAGRecovery { List<TezEvent> inputGeneratedTezEvents = new ArrayList<TezEvent>(); VertexInitializedEvent v1InitedEvent = new VertexInitializedEvent(v1Id, "vertex1", 0L, v1InitedTime, - v1NumTask, "", null, inputGeneratedTezEvents); + v1NumTask, "", null, inputGeneratedTezEvents, null); VertexConfigurationDoneEvent v1ReconfigureDoneEvent = new VertexConfigurationDoneEvent(v1Id, 0L, v1NumTask, null, null, null, true); VertexStartedEvent v1StartedEvent = new VertexStartedEvent(v1Id, 0L, v1StartedTime); @@ -803,7 +803,7 @@ public class TestDAGRecovery { List<TezEvent> inputGeneratedTezEvents = new ArrayList<TezEvent>(); VertexInitializedEvent v1InitedEvent = new VertexInitializedEvent(v1Id, "vertex1", 0L, v1InitedTime, - v1NumTask, "", null, inputGeneratedTezEvents); + v1NumTask, "", null, inputGeneratedTezEvents, null); Map<String, InputSpecUpdate> rootInputSpecs = new HashMap<String, InputSpecUpdate>(); VertexConfigurationDoneEvent v1ReconfigureDoneEvent = new VertexConfigurationDoneEvent(v1Id, 0L, v1NumTask, null, null, rootInputSpecs, true); @@ -911,7 +911,7 @@ public class TestDAGRecovery { List<TezEvent> inputGeneratedTezEvents = new ArrayList<TezEvent>(); VertexInitializedEvent v1InitedEvent = new VertexInitializedEvent(v1Id, "vertex1", 0L, v1InitedTime, - v1NumTask, "", null, inputGeneratedTezEvents); + v1NumTask, "", null, inputGeneratedTezEvents, null); Map<String, InputSpecUpdate> rootInputSpecs = new HashMap<String, InputSpecUpdate>(); VertexConfigurationDoneEvent v1ReconfigureDoneEvent = new VertexConfigurationDoneEvent(v1Id, 0L, v1NumTask, null, null, rootInputSpecs, true); @@ -1079,7 +1079,7 @@ public class TestDAGRecovery { VertexInitializedEvent v2InitedEvent = new VertexInitializedEvent(v2Id, "vertex2", 0L, v1InitedTime, - v1NumTask, "", null, null); + v1NumTask, "", null, null, null); VertexConfigurationDoneEvent v2ReconfigureDoneEvent = new VertexConfigurationDoneEvent(v2Id, 0L, v1NumTask, null, null, null, false); VertexStartedEvent v2StartedEvent = new VertexStartedEvent(v2Id, 0L, v1StartedTime); http://git-wip-us.apache.org/repos/asf/tez/blob/7e9ed831/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java index 68cfc39..e4cd956 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -21,6 +21,7 @@ package org.apache.tez.dag.app.dag.impl; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; +import static org.mockito.Matchers.anyInt; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -61,6 +62,7 @@ import org.apache.hadoop.yarn.util.SystemClock; import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.tez.common.MockDNSToSwitchMapping; +import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.app.dag.event.TaskEventTAFailed; import org.apache.tez.dag.app.dag.event.TaskEventTAKilled; import org.apache.tez.dag.app.dag.event.TaskEventTASucceeded; @@ -139,6 +141,9 @@ public class TestTaskAttempt { AppContext appCtx; Task mockTask; TaskLocationHint locationHint; + Vertex mockVertex; + ServicePluginInfo servicePluginInfo = new ServicePluginInfo() + .setContainerLauncherName(TezConstants.getTezYarnServicePluginName()); @BeforeClass public static void setup() { @@ -148,7 +153,14 @@ public class TestTaskAttempt { @Before public void setupTest() { appCtx = mock(AppContext.class); + when(appCtx.getContainerLauncherName(anyInt())).thenReturn( + TezConstants.getTezYarnServicePluginName()); + mockTask = mock(Task.class); + mockVertex = mock(Vertex.class); + when(mockTask.getVertex()).thenReturn(mockVertex); + when(mockVertex.getServicePluginInfo()).thenReturn(servicePluginInfo); + HistoryEventHandler mockHistHandler = mock(HistoryEventHandler.class); doReturn(mockHistHandler).when(appCtx).getHistoryHandler(); LogManager.getRootLogger().setLevel(Level.DEBUG); @@ -1737,7 +1749,6 @@ public class TestTaskAttempt { } - Vertex mockVertex = mock(Vertex.class); boolean inputFailedReported = false; @Override http://git-wip-us.apache.org/repos/asf/tez/blob/7e9ed831/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index d2d9a07..6b30a24 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -53,6 +53,7 @@ import org.apache.hadoop.io.DataOutputBuffer; import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.counters.Limits; import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.app.dag.event.TaskEventTALaunched; import org.apache.tez.dag.app.dag.event.TaskEventTASucceeded; import org.apache.tez.hadoop.shim.DefaultHadoopShim; @@ -2354,6 +2355,9 @@ public class TestVertexImpl { dispatcher = new DrainDispatcher(); appContext = mock(AppContext.class); when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim()); + when(appContext.getContainerLauncherName(anyInt())).thenReturn( + TezConstants.getTezYarnServicePluginName()); + thh = mock(TaskHeartbeatHandler.class); historyEventHandler = mock(HistoryEventHandler.class); TaskSchedulerManager taskScheduler = mock(TaskSchedulerManager.class); http://git-wip-us.apache.org/repos/asf/tez/blob/7e9ed831/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java index 5e7e906..7d3faf9 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java @@ -276,7 +276,7 @@ public class TestHistoryEventsProtoConversion { VertexInitializedEvent event = new VertexInitializedEvent( TezVertexID.getInstance( TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), - "vertex1", 1000l, 15000l, 100, "procName", null, initGeneratedEvents); + "vertex1", 1000l, 15000l, 100, "procName", null, initGeneratedEvents, null); VertexInitializedEvent deserializedEvent = (VertexInitializedEvent) testProtoConversion(event); Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID()); @@ -386,7 +386,7 @@ public class TestHistoryEventsProtoConversion { new VertexFinishedEvent(TezVertexID.getInstance( TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), "vertex1", 1, 1000l, 15000l, 16000l, 20000l, 1344400l, VertexState.ERROR, - null, null, null, null); + null, null, null, null, null); VertexFinishedEvent deserializedEvent = (VertexFinishedEvent) testProtoConversion(event); Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID()); @@ -401,7 +401,7 @@ public class TestHistoryEventsProtoConversion { new VertexFinishedEvent(TezVertexID.getInstance( TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), "vertex1", 1, 1000l, 15000l, 16000l, 20000l, 1344400l, VertexState.ERROR, - "diagnose", new TezCounters(), new VertexStats(), null); + "diagnose", new TezCounters(), new VertexStats(), null, null); VertexFinishedEvent deserializedEvent = (VertexFinishedEvent) testProtoConversion(event); Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID()); http://git-wip-us.apache.org/repos/asf/tez/blob/7e9ed831/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java index 5c596c5..9477118 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java @@ -138,7 +138,7 @@ public class TestHistoryEventJsonConversion { break; case VERTEX_INITIALIZED: event = new VertexInitializedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(), - random.nextInt(), "proc", null, null); + random.nextInt(), "proc", null, null, null); break; case VERTEX_STARTED: event = new VertexStartedEvent(tezVertexID, random.nextInt(), random.nextInt()); @@ -149,7 +149,7 @@ public class TestHistoryEventJsonConversion { case VERTEX_FINISHED: event = new VertexFinishedEvent(tezVertexID, "v1", 1, random.nextInt(), random.nextInt(), random.nextInt(), random.nextInt(), random.nextInt(), VertexState.ERROR, - null, null, null, null); + null, null, null, null, null); break; case TASK_STARTED: event = new TaskStartedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt()); http://git-wip-us.apache.org/repos/asf/tez/blob/7e9ed831/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java index 1e77ce8..62f0937 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java @@ -515,8 +515,12 @@ public class HistoryEventTimelineConversion { atsEntity.addEvent(startEvt); atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime()); - atsEntity.addOtherInfo(ATSConstants.IN_PROGRESS_LOGS_URL, event.getInProgressLogsUrl()); - atsEntity.addOtherInfo(ATSConstants.COMPLETED_LOGS_URL, event.getCompletedLogsUrl()); + if (event.getInProgressLogsUrl() != null) { + atsEntity.addOtherInfo(ATSConstants.IN_PROGRESS_LOGS_URL, event.getInProgressLogsUrl()); + } + if (event.getCompletedLogsUrl() != null) { + atsEntity.addOtherInfo(ATSConstants.COMPLETED_LOGS_URL, event.getCompletedLogsUrl()); + } atsEntity.addOtherInfo(ATSConstants.NODE_ID, event.getNodeId().toString()); atsEntity.addOtherInfo(ATSConstants.NODE_HTTP_ADDRESS, event.getNodeHttpAddress()); atsEntity.addOtherInfo(ATSConstants.CONTAINER_ID, event.getContainerId().toString()); @@ -615,6 +619,10 @@ public class HistoryEventTimelineConversion { DAGUtils.convertCountersToATSMap(event.getTezCounters())); atsEntity.addOtherInfo(ATSConstants.STATS, DAGUtils.convertVertexStatsToATSMap(event.getVertexStats())); + if (event.getServicePluginInfo() != null) { + atsEntity.addOtherInfo(ATSConstants.SERVICE_PLUGIN, + DAGUtils.convertServicePluginToATSMap(event.getServicePluginInfo())); + } final Map<String, Integer> vertexTaskStats = event.getVertexTaskStats(); if (vertexTaskStats != null) { @@ -651,6 +659,10 @@ public class HistoryEventTimelineConversion { atsEntity.addOtherInfo(ATSConstants.INIT_TIME, event.getInitedTime()); atsEntity.addOtherInfo(ATSConstants.NUM_TASKS, event.getNumTasks()); atsEntity.addOtherInfo(ATSConstants.PROCESSOR_CLASS_NAME, event.getProcessorName()); + if (event.getServicePluginInfo() != null) { + atsEntity.addOtherInfo(ATSConstants.SERVICE_PLUGIN, + DAGUtils.convertServicePluginToATSMap(event.getServicePluginInfo())); + } return atsEntity; } http://git-wip-us.apache.org/repos/asf/tez/blob/7e9ed831/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java index abfd757..f622056 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java @@ -47,6 +47,7 @@ import org.apache.tez.dag.api.records.DAGProtos.CallerContextProto; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.apache.tez.dag.app.dag.DAGState; import org.apache.tez.dag.app.dag.VertexState; +import org.apache.tez.dag.app.dag.impl.ServicePluginInfo; import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl.DataEventDependencyInfo; import org.apache.tez.dag.app.dag.impl.VertexStats; import org.apache.tez.dag.app.web.AMWebController; @@ -157,7 +158,7 @@ public class TestHistoryEventTimelineConversion { break; case VERTEX_INITIALIZED: event = new VertexInitializedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(), - random.nextInt(), "proc", null, null); + random.nextInt(), "proc", null, null, null); break; case VERTEX_STARTED: event = new VertexStartedEvent(tezVertexID, random.nextInt(), random.nextInt()); @@ -168,7 +169,7 @@ public class TestHistoryEventTimelineConversion { case VERTEX_FINISHED: event = new VertexFinishedEvent(tezVertexID, "v1", 1, random.nextInt(), random.nextInt(), random.nextInt(), random.nextInt(), random.nextInt(), VertexState.ERROR, - null, null, null, null); + null, null, null, null, null); break; case TASK_STARTED: event = new TaskStartedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt()); @@ -677,13 +678,19 @@ public class TestHistoryEventTimelineConversion { ((Integer) timelineEntity.getOtherInfo().get("BAR")).intValue()); } + @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testConvertVertexInitializedEvent() { long initRequestedTime = random.nextLong(); long initedTime = random.nextLong(); int numTasks = random.nextInt(); VertexInitializedEvent event = new VertexInitializedEvent(tezVertexID, "v1", initRequestedTime, - initedTime, numTasks, "proc", null, null); + initedTime, numTasks, "proc", null, null, + new ServicePluginInfo().setContainerLauncherName("abc") + .setTaskSchedulerName("def").setTaskCommunicatorName("ghi") + .setContainerLauncherClassName("abc1") + .setTaskSchedulerClassName("def1") + .setTaskCommunicatorClassName("ghi1")); TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); Assert.assertEquals(EntityTypes.TEZ_VERTEX_ID.name(), timelineEntity.getEntityType()); @@ -720,6 +727,25 @@ public class TestHistoryEventTimelineConversion { ((Long) timelineEntity.getOtherInfo().get(ATSConstants.INIT_TIME)).longValue()); Assert.assertEquals(numTasks, ((Integer) timelineEntity.getOtherInfo().get(ATSConstants.NUM_TASKS)).intValue()); + Assert.assertNotNull(timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)); + Assert.assertEquals("abc", + ((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get( + ATSConstants.CONTAINER_LAUNCHER_NAME)); + Assert.assertEquals("def", + ((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get( + ATSConstants.TASK_SCHEDULER_NAME)); + Assert.assertEquals("ghi", + ((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get( + ATSConstants.TASK_COMMUNICATOR_NAME)); + Assert.assertEquals("abc1", + ((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get( + ATSConstants.CONTAINER_LAUNCHER_CLASS_NAME)); + Assert.assertEquals("def1", + ((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get( + ATSConstants.TASK_SCHEDULER_CLASS_NAME)); + Assert.assertEquals("ghi1", + ((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get( + ATSConstants.TASK_COMMUNICATOR_CLASS_NAME)); } @Test(timeout = 5000) @@ -756,6 +782,7 @@ public class TestHistoryEventTimelineConversion { timelineEntity.getOtherInfo().get(ATSConstants.STATUS)); } + @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testConvertVertexFinishedEvent() { long initRequestedTime = random.nextLong(); @@ -770,7 +797,12 @@ public class TestHistoryEventTimelineConversion { VertexFinishedEvent event = new VertexFinishedEvent(tezVertexID, "v1", 1, initRequestedTime, initedTime, startRequestedTime, startTime, finishTime, VertexState.ERROR, - "diagnostics", null, vertexStats, taskStats); + "diagnostics", null, vertexStats, taskStats, + new ServicePluginInfo().setContainerLauncherName("abc") + .setTaskSchedulerName("def").setTaskCommunicatorName("ghi") + .setContainerLauncherClassName("abc1") + .setTaskSchedulerClassName("def1") + .setTaskCommunicatorClassName("ghi1")); TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); Assert.assertEquals(EntityTypes.TEZ_VERTEX_ID.name(), timelineEntity.getEntityType()); @@ -801,6 +833,25 @@ public class TestHistoryEventTimelineConversion { timelineEntity.getOtherInfo().get(ATSConstants.STATUS)); Assert.assertEquals("diagnostics", timelineEntity.getOtherInfo().get(ATSConstants.DIAGNOSTICS)); + Assert.assertNotNull(timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)); + Assert.assertEquals("abc", + ((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get( + ATSConstants.CONTAINER_LAUNCHER_NAME)); + Assert.assertEquals("def", + ((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get( + ATSConstants.TASK_SCHEDULER_NAME)); + Assert.assertEquals("ghi", + ((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get( + ATSConstants.TASK_COMMUNICATOR_NAME)); + Assert.assertEquals("abc1", + ((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get( + ATSConstants.CONTAINER_LAUNCHER_CLASS_NAME)); + Assert.assertEquals("def1", + ((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get( + ATSConstants.TASK_SCHEDULER_CLASS_NAME)); + Assert.assertEquals("ghi1", + ((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get( + ATSConstants.TASK_COMMUNICATOR_CLASS_NAME)); Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.STATS)); http://git-wip-us.apache.org/repos/asf/tez/blob/7e9ed831/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java index 3f669c6..b9229e2 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java @@ -165,13 +165,13 @@ public class TestRecovery { ApplicationAttemptId.newInstance(appId, 1), null)), new SimpleShutdownCondition(TIMING.POST, new VertexInitializedEvent(vertexId0, "Tokenizer", 0L, 0L, 0, - "", null, initGeneratedEvents)), + "", null, initGeneratedEvents, null)), new SimpleShutdownCondition(TIMING.POST, new VertexInitializedEvent(vertexId1, "Summation", 0L, 0L, 0, - "", null, null)), + "", null, null, null)), new SimpleShutdownCondition(TIMING.POST, new VertexInitializedEvent(vertexId2, "Sorter", 0L, 0L, 0, "", - null, null)), + null, null, null)), new SimpleShutdownCondition(TIMING.POST, new VertexConfigurationDoneEvent(vertexId0, 0L, 2, null, null, @@ -194,15 +194,15 @@ public class TestRecovery { new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent( vertexId0, "vertexName", 1, 0L, 0L, 0L, 0L, 0L, VertexState.SUCCEEDED, "", new TezCounters(), - new VertexStats(), new HashMap<String, Integer>())), + new VertexStats(), new HashMap<String, Integer>(), null)), new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent( vertexId1, "vertexName", 1, 0L, 0L, 0L, 0L, 0L, VertexState.SUCCEEDED, "", new TezCounters(), - new VertexStats(), new HashMap<String, Integer>())), + new VertexStats(), new HashMap<String, Integer>(), null)), new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent( vertexId2, "vertexName", 1, 0L, 0L, 0L, 0L, 0L, VertexState.SUCCEEDED, "", new TezCounters(), - new VertexStats(), new HashMap<String, Integer>())), + new VertexStats(), new HashMap<String, Integer>(), null)), new SimpleShutdownCondition(TIMING.POST, new TaskStartedEvent( TezTaskID.getInstance(vertexId0, 0), "vertexName", 0L, 0L)), @@ -376,11 +376,11 @@ public class TestRecovery { "dagName", new HashMap<String, Integer>(), ApplicationAttemptId .newInstance(appId, 1), null)), new SimpleShutdownCondition(TIMING.POST, new VertexInitializedEvent( - vertexId0, "hashSide", 0L, 0L, 0, "", null, initGeneratedEvents)), + vertexId0, "hashSide", 0L, 0L, 0, "", null, initGeneratedEvents, null)), new SimpleShutdownCondition(TIMING.POST, new VertexInitializedEvent( - vertexId1, "streamingSide", 0L, 0L, 0, "", null, null)), + vertexId1, "streamingSide", 0L, 0L, 0, "", null, null, null)), new SimpleShutdownCondition(TIMING.POST, new VertexInitializedEvent( - vertexId2, "joiner", 0L, 0L, 0, "", null, null)), + vertexId2, "joiner", 0L, 0L, 0, "", null, null, null)), new SimpleShutdownCondition(TIMING.POST, new VertexStartedEvent( vertexId0, 0L, 0L)), @@ -404,15 +404,15 @@ public class TestRecovery { new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent( vertexId0, "vertexName", 1, 0L, 0L, 0L, 0L, 0L, VertexState.SUCCEEDED, "", new TezCounters(), new VertexStats(), - new HashMap<String, Integer>())), + new HashMap<String, Integer>(), null)), new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent( vertexId1, "vertexName", 1, 0L, 0L, 0L, 0L, 0L, VertexState.SUCCEEDED, "", new TezCounters(), new VertexStats(), - new HashMap<String, Integer>())), + new HashMap<String, Integer>(), null)), new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent( vertexId2, "vertexName", 1, 0L, 0L, 0L, 0L, 0L, VertexState.SUCCEEDED, "", new TezCounters(), new VertexStats(), - new HashMap<String, Integer>())), + new HashMap<String, Integer>(), null)), new SimpleShutdownCondition(TIMING.POST, new TaskStartedEvent(TezTaskID .getInstance(vertexId0, 0), "vertexName", 0L, 0L)), @@ -574,7 +574,7 @@ public class TestRecovery { 0L, "username", "dagName")), new SimpleShutdownCondition(TIMING.POST, new VertexInitializedEvent(vertexId0, "Tokenizer", 0L, 0L, 0, - "", null, initGeneratedEvents)), + "", null, initGeneratedEvents, null)), new SimpleShutdownCondition(TIMING.POST, new VertexStartedEvent( vertexId0, 0L, 0L)), new SimpleShutdownCondition(TIMING.POST, @@ -592,15 +592,15 @@ public class TestRecovery { new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent( vertexId0, "vertexName", 1, 0L, 0L, 0L, 0L, 0L, VertexState.SUCCEEDED, "", new TezCounters(), - new VertexStats(), new HashMap<String, Integer>())), + new VertexStats(), new HashMap<String, Integer>(), null)), new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent( vertexId1, "vertexName", 1, 0L, 0L, 0L, 0L, 0L, VertexState.SUCCEEDED, "", new TezCounters(), - new VertexStats(), new HashMap<String, Integer>())), + new VertexStats(), new HashMap<String, Integer>(), null)), new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent( vertexId2, "vertexName", 1, 0L, 0L, 0L, 0L, 0L, VertexState.SUCCEEDED, "", new TezCounters(), - new VertexStats(), new HashMap<String, Integer>())), + new VertexStats(), new HashMap<String, Integer>(), null)), new SimpleShutdownCondition(TIMING.POST, new DAGFinishedEvent( dagId, 0L, 0L, DAGState.SUCCEEDED, "", new TezCounters(), "username", "dagName", new HashMap<String, Integer>(),
