Include container log urls and counters as part of history events in the AM logs.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/6d5625f4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/6d5625f4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/6d5625f4 Branch: refs/heads/master Commit: 6d5625f4fa38cf35e6f7201fb4b18c47ec6964d3 Parents: 34de34b Author: Hitesh Shah <[email protected]> Authored: Thu May 30 13:34:38 2013 -0700 Committer: Hitesh Shah <[email protected]> Committed: Thu May 30 13:34:38 2013 -0700 ---------------------------------------------------------------------- tez-dag/src/main/avro/HistoryEvents.avpr | 35 +++++++- .../org/apache/tez/dag/app/dag/impl/DAGImpl.java | 5 +- .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 27 ++++++- .../org/apache/tez/dag/app/dag/impl/TaskImpl.java | 5 +- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 6 +- .../apache/tez/dag/history/events/AvroUtils.java | 53 ++++++++++++ .../tez/dag/history/events/DAGFinishedEvent.java | 9 ++- .../history/events/TaskAttemptFinishedEvent.java | 10 ++- .../history/events/TaskAttemptStartedEvent.java | 11 ++- .../tez/dag/history/events/TaskFinishedEvent.java | 9 ++- .../dag/history/events/VertexFinishedEvent.java | 10 ++- .../tez/dag/app/rm/container/TestAMContainer.java | 3 +- .../apache/tez/mapreduce/examples/MRRSleepJob.java | 65 +++++++++------ 13 files changed, 200 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6d5625f4/tez-dag/src/main/avro/HistoryEvents.avpr ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/avro/HistoryEvents.avpr b/tez-dag/src/main/avro/HistoryEvents.avpr index 04333e6..8805c6d 100644 --- a/tez-dag/src/main/avro/HistoryEvents.avpr +++ b/tez-dag/src/main/avro/HistoryEvents.avpr @@ -21,6 +21,29 @@ "types": [ + {"type": "record", "name": "TezCounter", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "displayName", "type": "string"}, + {"name": "value", "type": "long"} + ] + }, + + {"type": "record", "name": "TezCounterGroup", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "displayName", "type": "string"}, + {"name": "counts", "type": {"type": "array", "items": "TezCounter"}} + ] + }, + + {"type": "record", "name": "TezCounters", + "fields": [ + {"name": "groups", "type": {"type": "array", "items": "TezCounterGroup"}} + ] + }, + + {"type": "record", "name": "AMStarted", "fields": [ {"name": "applicationAttemptId", "type": "string"}, @@ -50,7 +73,8 @@ {"name": "dagId", "type": "string"}, {"name": "finishTime", "type": "long"}, {"name": "status", "type": "string"}, - {"name": "diagnostics", "type": "string"} + {"name": "diagnostics", "type": "string"}, + {"name": "counters", "type": "TezCounters"} ] }, @@ -71,7 +95,8 @@ {"name": "vertexId", "type": "string"}, {"name": "finishTime", "type": "long"}, {"name": "status", "type": "string"}, - {"name": "diagnostics", "type": "string"} + {"name": "diagnostics", "type": "string"}, + {"name": "counters", "type": "TezCounters"} ] }, @@ -89,7 +114,8 @@ {"name": "vertexName", "type": "string"}, {"name": "taskId", "type": "string"}, {"name": "finishTime", "type": "long"}, - {"name": "status", "type": "string"} + {"name": "status", "type": "string"}, + {"name": "counters", "type": "TezCounters"} ] }, @@ -109,7 +135,8 @@ {"name": "taskAttemptId", "type": "string"}, {"name": "finishTime", "type": "long"}, {"name": "status", "type": "string"}, - {"name": "diagnostics", "type": "string"} + {"name": "diagnostics", "type": "string"}, + {"name": "counters", "type": "TezCounters"} ] }, http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6d5625f4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index 3681c85..d2395ad 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -625,7 +625,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, void logJobHistoryFinishedEvent() { this.setFinishTime(); DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, finishTime, - DAGStatus.State.SUCCEEDED, ""); + DAGStatus.State.SUCCEEDED, "", getAllCounters()); this.eventHandler.handle( new DAGHistoryEvent(dagId, finishEvt)); } @@ -641,7 +641,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, void logJobHistoryUnsuccesfulEvent(DAGStatus.State state) { DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, clock.getTime(), - state, StringUtils.join(LINE_SEPARATOR, getDiagnostics())); + state, StringUtils.join(LINE_SEPARATOR, getDiagnostics()), + getAllCounters()); this.eventHandler.handle( new DAGHistoryEvent(dagId, finishEvt)); } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6d5625f4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index 5225ecf..c14ba8a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; @@ -794,9 +795,27 @@ public class TaskAttemptImpl implements TaskAttempt, @SuppressWarnings("unchecked") protected void logJobHistoryAttemptStarted() { + final String containerIdStr = containerId.toString(); + String inProgressLogsUrl = nodeHttpAddress + + "/" + "node/containerlogs" + + "/" + containerIdStr + + "/" + this.appContext.getUser(); + String completedLogsUrl = ""; + if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, + YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED) + && conf.get(YarnConfiguration.YARN_LOG_SERVER_URL) != null) { + String contextStr = "v_" + getTask().getVertex().getName() + + "_" + this.attemptId.toString(); + completedLogsUrl = conf.get(YarnConfiguration.YARN_LOG_SERVER_URL) + + "/" + containerNodeId.toString() + + "/" + containerIdStr + + "/" + contextStr + + "/" + this.appContext.getUser(); + } TaskAttemptStartedEvent startEvt = new TaskAttemptStartedEvent( attemptId, getTask().getVertex().getName(), - launchTime, containerId, containerNodeId); + launchTime, containerId, containerNodeId, + inProgressLogsUrl, completedLogsUrl); eventHandler.handle(new DAGHistoryEvent( attemptId.getTaskID().getVertexID().getDAGId(), startEvt)); @@ -809,7 +828,8 @@ public class TaskAttemptImpl implements TaskAttempt, TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent( attemptId, getTask().getVertex().getName(), - getFinishTime(), TaskAttemptState.SUCCEEDED, ""); + getFinishTime(), TaskAttemptState.SUCCEEDED, "", + getCounters()); // FIXME how do we store information regd completion events eventHandler.handle(new DAGHistoryEvent( attemptId.getTaskID().getVertexID().getDAGId(), @@ -823,7 +843,8 @@ public class TaskAttemptImpl implements TaskAttempt, attemptId, getTask().getVertex().getName(), clock.getTime(), state, StringUtils.join( - LINE_SEPARATOR, getDiagnostics())); + LINE_SEPARATOR, getDiagnostics()), + getCounters()); // FIXME how do we store information regd completion events eventHandler.handle(new DAGHistoryEvent( attemptId.getTaskID().getVertexID().getDAGId(), http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6d5625f4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index fb738ce..575f32c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -806,14 +806,15 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { // FIXME need to handle getting finish time as this function // is called from within a transition TaskFinishedEvent finishEvt = new TaskFinishedEvent(taskId, - getVertex().getName(), clock.getTime(), TaskState.SUCCEEDED); + getVertex().getName(), clock.getTime(), TaskState.SUCCEEDED, + getCounters()); this.eventHandler.handle(new DAGHistoryEvent( taskId.getVertexID().getDAGId(), finishEvt)); } protected void logJobHistoryTaskFailedEvent(TaskState finalState) { TaskFinishedEvent finishEvt = new TaskFinishedEvent(taskId, - getVertex().getName(), clock.getTime(), finalState); + getVertex().getName(), clock.getTime(), finalState, getCounters()); this.eventHandler.handle(new DAGHistoryEvent( taskId.getVertexID().getDAGId(), finishEvt)); } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6d5625f4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index e34382f..c7bceb6 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -670,14 +670,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, void logJobHistoryVertexFinishedEvent() { this.setFinishTime(); VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId, - vertexName, finishTime, VertexStatus.State.SUCCEEDED, ""); + vertexName, finishTime, VertexStatus.State.SUCCEEDED, "", + getAllCounters()); this.eventHandler.handle(new DAGHistoryEvent(getDAGId(), finishEvt)); } void logJobHistoryVertexFailedEvent(VertexStatus.State state) { VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId, vertexName, clock.getTime(), state, - StringUtils.join(LINE_SEPARATOR, getDiagnostics())); + StringUtils.join(LINE_SEPARATOR, getDiagnostics()), + getAllCounters()); this.eventHandler.handle(new DAGHistoryEvent(getDAGId(), finishEvt)); } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6d5625f4/tez-dag/src/main/java/org/apache/tez/dag/history/events/AvroUtils.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AvroUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AvroUtils.java new file mode 100644 index 0000000..3587964 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AvroUtils.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.history.events; + +import java.util.ArrayList; + +import org.apache.avro.util.Utf8; +import org.apache.tez.common.counters.CounterGroup; +import org.apache.tez.dag.history.avro.TezCounter; +import org.apache.tez.dag.history.avro.TezCounterGroup; +import org.apache.tez.dag.history.avro.TezCounters; + +public class AvroUtils { + + public static TezCounters toAvro( + org.apache.tez.common.counters.TezCounters counters) { + TezCounters result = new TezCounters(); + result.groups = new ArrayList<TezCounterGroup>(0); + if (counters == null) return result; + for (CounterGroup group : counters) { + TezCounterGroup g = new TezCounterGroup(); + g.name = new Utf8(group.getName()); + g.displayName = new Utf8(group.getDisplayName()); + g.counts = new ArrayList<TezCounter>(group.size()); + for (org.apache.tez.common.counters.TezCounter counter : group) { + TezCounter c = new TezCounter(); + c.name = new Utf8(counter.getName()); + c.displayName = new Utf8(counter.getDisplayName()); + c.value = counter.getValue(); + g.counts.add(c); + } + result.groups.add(g); + } + return result; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6d5625f4/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java index 8794cea..8ded3d4 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java @@ -18,6 +18,7 @@ package org.apache.tez.dag.history.events; +import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.avro.DAGFinished; @@ -27,14 +28,17 @@ import org.apache.tez.dag.records.TezDAGID; public class DAGFinishedEvent implements HistoryEvent { private DAGFinished datum = new DAGFinished(); + // FIXME remove this when we have a proper history + private final TezCounters tezCounters; public DAGFinishedEvent(TezDAGID dagId, long finishTime, DAGStatus.State state, - String diagnostics) { + String diagnostics, TezCounters counters) { datum.dagId = dagId.toString(); datum.finishTime = finishTime; datum.status = state.name(); datum.diagnostics = diagnostics; + tezCounters = counters; } @Override @@ -58,6 +62,7 @@ public class DAGFinishedEvent implements HistoryEvent { return "dagId=" + datum.dagId + ", finishTime=" + datum.finishTime + ", status=" + datum.status - + ", diagnostics=" + datum.diagnostics; + + ", diagnostics=" + datum.diagnostics + + ", counters=" + tezCounters.toString(); } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6d5625f4/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java index 395ba93..9794938 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java @@ -18,6 +18,7 @@ package org.apache.tez.dag.history.events; +import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.oldrecords.TaskAttemptState; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.avro.HistoryEventType; @@ -27,17 +28,21 @@ import org.apache.tez.dag.records.TezTaskAttemptID; public class TaskAttemptFinishedEvent implements HistoryEvent { private TaskAttemptFinished datum = new TaskAttemptFinished(); + // FIXME remove this when we have a proper history + private final TezCounters tezCounters; public TaskAttemptFinishedEvent(TezTaskAttemptID taId, String vertexName, long finishTime, TaskAttemptState state, - String diagnostics) { + String diagnostics, + TezCounters counters) { datum.taskAttemptId = taId.toString(); datum.vertexName = vertexName; datum.finishTime = finishTime; datum.status = state.name(); datum.diagnostics = diagnostics; + tezCounters = counters; } @Override @@ -62,6 +67,7 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { + ", taskAttemptId=" + datum.taskAttemptId + ", finishTime=" + datum.finishTime + ", status=" + datum.status - + ", diagnostics=" + datum.diagnostics; + + ", diagnostics=" + datum.diagnostics + + ", counters=" + tezCounters.toString(); } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6d5625f4/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java index 97c9f2f..5d29b21 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java @@ -28,15 +28,20 @@ import org.apache.tez.dag.records.TezTaskAttemptID; public class TaskAttemptStartedEvent implements HistoryEvent { private TaskAttemptStarted datum = new TaskAttemptStarted(); + private final String inProgressLogsUrl; + private final String completedLogsUrl; public TaskAttemptStartedEvent(TezTaskAttemptID taId, String vertexName, long startTime, - ContainerId containerId, NodeId nodeId) { + ContainerId containerId, NodeId nodeId, + String inProgressLogsUrl, String completedLogsUrl) { datum.taskAttemptId = taId.toString(); datum.vertexName = vertexName; datum.startTime = startTime; datum.containerId = containerId.toString(); datum.nodeId = nodeId.toString(); + this.inProgressLogsUrl = inProgressLogsUrl; + this.completedLogsUrl = completedLogsUrl; } @Override @@ -61,6 +66,8 @@ public class TaskAttemptStartedEvent implements HistoryEvent { + ", taskAttemptId=" + datum.taskAttemptId + ", startTime=" + datum.startTime + ", containerId=" + datum.containerId - + ", nodeId=" + datum.nodeId; + + ", nodeId=" + datum.nodeId + + ", inProgressLogs=" + inProgressLogsUrl + + ", completedLogs=" + completedLogsUrl; } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6d5625f4/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java index 60f810b..5408a72 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java @@ -18,6 +18,7 @@ package org.apache.tez.dag.history.events; +import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.oldrecords.TaskState; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.avro.HistoryEventType; @@ -27,14 +28,17 @@ import org.apache.tez.dag.records.TezTaskID; public class TaskFinishedEvent implements HistoryEvent { private TaskFinished datum = new TaskFinished(); + // FIXME remove this when we have a proper history + private final TezCounters tezCounters; public TaskFinishedEvent(TezTaskID taskId, String vertexName, long finishTime, - TaskState state) { + TaskState state, TezCounters counters) { datum.vertexName = vertexName; datum.taskId = taskId.toString(); datum.finishTime = finishTime; datum.status = state.name(); + tezCounters = counters; } @Override @@ -58,6 +62,7 @@ public class TaskFinishedEvent implements HistoryEvent { return "vertexName=" + datum.vertexName + ", taskId=" + datum.taskId + ", finishTime=" + datum.finishTime - + ", status=" + datum.status; + + ", status=" + datum.status + + ", counters=" + tezCounters.toString(); } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6d5625f4/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java index 7e86c66..ec13ebd 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java @@ -18,6 +18,7 @@ package org.apache.tez.dag.history.events; +import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.client.VertexStatus; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.avro.HistoryEventType; @@ -27,15 +28,19 @@ import org.apache.tez.dag.records.TezVertexID; public class VertexFinishedEvent implements HistoryEvent { private VertexFinished datum = new VertexFinished(); + // FIXME remove this when we have a proper history + private final TezCounters tezCounters; public VertexFinishedEvent(TezVertexID vertexId, String vertexName, long finishTime, - VertexStatus.State state, String diagnostics) { + VertexStatus.State state, String diagnostics, + TezCounters counters) { datum.vertexName = vertexName; datum.vertexId = vertexId.toString(); datum.finishTime = finishTime; datum.status = state.name(); datum.diagnostics = diagnostics; + tezCounters = counters; } @Override @@ -60,6 +65,7 @@ public class VertexFinishedEvent implements HistoryEvent { + ", vertexId=" + datum.vertexId + ", finishTime=" + datum.finishTime + ", status=" + datum.status - + ", diagnostics=" + datum.diagnostics; + + ", diagnostics=" + datum.diagnostics + + ", counters=" + tezCounters.toString(); } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6d5625f4/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java index 4541a2e..cf01f43 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java @@ -839,7 +839,8 @@ public class TestAMContainer { resource = BuilderUtils.newResource(1024, 1); priority = BuilderUtils.newPriority(1); container = BuilderUtils.newContainer(containerID, nodeID, - nodeHttpAddress, resource, priority, null, rmIdentifier); + nodeHttpAddress, resource, priority, null); + chh = mock(ContainerHeartbeatHandler.class); InetSocketAddress addr = new InetSocketAddress("localhost", 0); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6d5625f4/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java index 6115c6f..627d583 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java @@ -72,6 +72,8 @@ public class MRRSleepJob extends Configured implements Tool { "mrr.sleepjob.ireduce.sleep.time"; public static String IREDUCE_STAGES_COUNT = "mrr.sleepjob.ireduces.stages.count"; + public static String IREDUCE_TASKS_COUNT = + "mrr.sleepjob.ireduces.tasks.count"; // Flags to inject failures public static String MAP_THROW_ERROR = "mrr.sleepjob.map.throw.error"; @@ -109,28 +111,21 @@ public class MRRSleepJob extends Configured implements Tool { InputSplit ignored, TaskAttemptContext taskContext) throws IOException { Configuration conf = taskContext.getConfiguration(); - String vertexName = conf.get( - org.apache.tez.mapreduce.hadoop.MRJobConfig.VERTEX_NAME); - boolean isIntermediateReduce = - MultiStageMRConfigUtil.getIntermediateStageNum(vertexName) != -1; - - final int count = - (isIntermediateReduce)? - conf.getInt(IREDUCE_SLEEP_COUNT, 1) : - conf.getInt(MAP_SLEEP_COUNT, 1); - if (count < 0) throw new IOException("Invalid map count: " + count); - int totalIReduces = MultiStageMRConfigUtil.getNumIntermediateStages(conf); - boolean finalIReduce = totalIReduces == - (MultiStageMRConfigUtil.getIntermediateStageNum(vertexName) + 1); + final int count = conf.getInt(MAP_SLEEP_COUNT, 1); + if (count < 0) { + throw new IOException("Invalid map count: " + count); + } - final int redcount = finalIReduce? - conf.getInt(REDUCE_SLEEP_COUNT, 1) : - conf.getInt(IREDUCE_SLEEP_COUNT, 1); - if (redcount < 0) - throw new IOException("Invalid reduce count: " + redcount); + int totalIReduces = conf.getInt(IREDUCE_STAGES_COUNT, 1); - final int emitPerMapTask = (redcount * taskContext.getNumReduceTasks()); + int reduceTasks = totalIReduces == 0? + taskContext.getNumReduceTasks() : + conf.getInt(IREDUCE_TASKS_COUNT, 1); + int sleepCount = totalIReduces == 0? + conf.getInt(REDUCE_SLEEP_COUNT,1) : + conf.getInt(IREDUCE_SLEEP_COUNT,1); + final int emitPerMapTask = sleepCount * reduceTasks; return new RecordReader<IntWritable,IntWritable>() { private int records = 0; @@ -219,6 +214,9 @@ public class MRRSleepJob extends Configured implements Tool { ) throws IOException, InterruptedException { //it is expected that every map processes mapSleepCount number of records. try { + LOG.info("Reading in " + vertexName + + " taskid " + context.getTaskAttemptID().getTaskID().getId() + + " key " + key.get()); LOG.info("Sleeping in InitialMap" + ", vertexName=" + vertexName + ", taskAttemptId=" + context.getTaskAttemptID() @@ -228,7 +226,9 @@ public class MRRSleepJob extends Configured implements Tool { + (mapSleepDuration * (mapSleepCount - count))); context.setStatus("Sleeping... (" + (mapSleepDuration * (mapSleepCount - count)) + ") ms left"); - Thread.sleep(mapSleepDuration); + if ((mapSleepCount - count) > 0) { + Thread.sleep(mapSleepDuration); + } if (throwError || throwFatal) { throw new IOException("Throwing a simulated error from map"); } @@ -242,6 +242,9 @@ public class MRRSleepJob extends Configured implements Tool { // each reducer will get reduceSleepCount number of keys. int k = key.get(); for (int i = 0; i < value.get(); ++i) { + LOG.info("Writing in " + vertexName + + " taskid " + context.getTaskAttemptID().getTaskID().getId() + + " key " + (k+i) + " value 1"); context.write(new IntWritable(k + i), new IntWritable(1)); } } @@ -269,6 +272,10 @@ public class MRRSleepJob extends Configured implements Tool { Context context) throws IOException, InterruptedException { try { + LOG.info("Reading in " + vertexName + + " taskid " + context.getTaskAttemptID().getTaskID().getId() + + " key " + key.get()); + LOG.info("Sleeping in IntermediateReduce" + ", vertexName=" + vertexName + ", taskAttemptId=" + context.getTaskAttemptID() @@ -278,7 +285,9 @@ public class MRRSleepJob extends Configured implements Tool { + (iReduceSleepDuration * (iReduceSleepCount - count))); context.setStatus("Sleeping... (" + (iReduceSleepDuration * (iReduceSleepCount - count)) + ") ms left"); - Thread.sleep(iReduceSleepDuration); + if ((iReduceSleepCount - count) > 0) { + Thread.sleep(iReduceSleepDuration); + } } catch (InterruptedException ex) { throw (IOException)new IOException( @@ -290,6 +299,9 @@ public class MRRSleepJob extends Configured implements Tool { int k = key.get(); for (IntWritable value : values) { for (int i = 0; i < value.get(); ++i) { + LOG.info("Writing in " + vertexName + + " taskid " + context.getTaskAttemptID().getTaskID().getId() + + " key " + (k+i) + " value 1"); context.write(new IntWritable(k + i), new IntWritable(1)); } } @@ -318,6 +330,9 @@ public class MRRSleepJob extends Configured implements Tool { Context context) throws IOException { try { + LOG.info("Reading in " + vertexName + + " taskid " + context.getTaskAttemptID().getTaskID().getId() + + " key " + key.get()); LOG.info("Sleeping in FinalReduce" + ", vertexName=" + vertexName + ", taskAttemptId=" + context.getTaskAttemptID() @@ -327,8 +342,9 @@ public class MRRSleepJob extends Configured implements Tool { + (reduceSleepDuration * (reduceSleepCount - count))); context.setStatus("Sleeping... (" + (reduceSleepDuration * (reduceSleepCount - count)) + ") ms left"); - Thread.sleep(reduceSleepDuration); - + if ((reduceSleepCount - count) > 0) { + Thread.sleep(reduceSleepDuration); + } } catch (InterruptedException ex) { throw (IOException)new IOException( @@ -357,6 +373,7 @@ public class MRRSleepJob extends Configured implements Tool { conf.setInt(IREDUCE_SLEEP_COUNT, iReduceSleepCount); conf.setInt(MRJobConfig.NUM_MAPS, numMapper); conf.setInt(IREDUCE_STAGES_COUNT, iReduceStagesCount); + conf.setInt(IREDUCE_TASKS_COUNT, numIReducer); // Configure intermediate reduces conf.setInt( @@ -364,7 +381,7 @@ public class MRRSleepJob extends Configured implements Tool { iReduceStagesCount); LOG.info("Running MRR with " + iReduceStagesCount + " IR stages"); - for (int i = 0; i < iReduceStagesCount; ++i) { + for (int i = 1; i <= iReduceStagesCount; ++i) { // Set reducer class for intermediate reduce conf.setClass( MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(i,
