TEZ-2234. Add API for statistics information - allow vertex managers to get output size per source vertex (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/bd9b8d95 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/bd9b8d95 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/bd9b8d95 Branch: refs/heads/master Commit: bd9b8d9516a127c98fc9b987fae84898e4207145 Parents: c8ef244 Author: Bikas Saha <[email protected]> Authored: Fri Apr 10 22:52:11 2015 -0700 Committer: Bikas Saha <[email protected]> Committed: Fri Apr 10 22:52:11 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../tez/dag/api/VertexManagerPluginContext.java | 21 +++ .../apache/tez/runtime/api/InputContext.java | 7 + .../apache/tez/runtime/api/InputStatistics.java | 40 +++++ .../runtime/api/InputStatisticsReporter.java | 37 +++++ .../apache/tez/runtime/api/OutputContext.java | 7 + .../tez/runtime/api/OutputStatistics.java | 41 +++++ .../runtime/api/OutputStatisticsReporter.java | 37 +++++ .../tez/runtime/api/VertexStatistics.java | 58 +++++++ .../java/org/apache/tez/dag/app/dag/Task.java | 1 - .../java/org/apache/tez/dag/app/dag/Vertex.java | 5 + .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 8 + .../apache/tez/dag/app/dag/impl/TaskImpl.java | 21 ++- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 128 ++++++++++++++- .../tez/dag/app/dag/impl/VertexManager.java | 20 ++- .../apache/tez/dag/app/MockDAGAppMaster.java | 11 +- .../tez/dag/app/TestMockDAGAppMaster.java | 156 +++++++++++++++++++ .../tez/dag/app/dag/impl/TestTaskAttempt.java | 4 +- .../runtime/LogicalIOProcessorRuntimeTask.java | 11 +- .../api/events/TaskStatusUpdateEvent.java | 17 +- .../tez/runtime/api/impl/IOStatistics.java | 30 ++++ .../tez/runtime/api/impl/TaskStatistics.java | 71 +++++++++ .../runtime/api/impl/TezInputContextImpl.java | 28 +++- .../runtime/api/impl/TezOutputContextImpl.java | 26 +++- .../api/impl/TezProcessorContextImpl.java | 4 +- .../runtime/api/impl/TezTaskContextImpl.java | 5 +- .../apache/tez/runtime/task/TaskReporter.java | 18 ++- .../library/input/OrderedGroupedKVInput.java | 5 + .../runtime/library/input/UnorderedKVInput.java | 5 + .../output/OrderedPartitionedKVOutput.java | 12 +- .../library/output/UnorderedKVOutput.java | 11 +- .../output/UnorderedPartitionedKVOutput.java | 11 +- .../library/output/TestOnFileSortedOutput.java | 17 +- .../output/TestOnFileUnorderedKVOutput.java | 18 ++- 34 files changed, 843 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 7d1127f..e12edaa 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,8 @@ INCOMPATIBLE CHANGES TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly ALL CHANGES: + TEZ-2234. Add API for statistics information - allow vertex managers to get + output size per source vertex TEZ-2274. Tez UI: full data loading, client side search and sort for other pages TEZ-2301. Switch Tez Pre-commit builds to use tezqa user. TEZ-2299. Invalid dag creation in MRRSleepJob post TEZ-2293. http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java index 38ecbf6..ab4ced0 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.tez.dag.api.TaskLocationHint; import org.apache.tez.dag.api.event.VertexState; import org.apache.tez.runtime.api.InputSpecUpdate; +import org.apache.tez.runtime.api.VertexStatistics; import org.apache.tez.runtime.api.events.InputDataInformationEvent; import com.google.common.base.Preconditions; @@ -60,6 +61,7 @@ public interface VertexManagerPluginContext { return locationHint; } } + /** * Get the edge properties on the input edges of this vertex. The input edge * is represented by the source vertex name @@ -68,6 +70,25 @@ public interface VertexManagerPluginContext { public Map<String, EdgeProperty> getInputVertexEdgeProperties(); /** + * Get the edge properties on the output edges of this vertex. The output edge + * is represented by the destination vertex name + * @return Map of destination vertex name and edge property + */ + public Map<String, EdgeProperty> getOutputVertexEdgeProperties(); + + /** + * Get a {@link VertexStatistics} object to find out execution statistics + * about the given {@link Vertex}. + * <br>This only provides point in time values for the statistics and must be + * called again to get updated values. + * + * @param vertexName + * Name of the {@link Vertex} + * @return {@link VertexStatistics} for the given vertex + */ + public VertexStatistics getVertexStatistics(String vertexName); + + /** * Get the name of the vertex * @return Vertex name */ http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-api/src/main/java/org/apache/tez/runtime/api/InputContext.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/InputContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/InputContext.java index fc02878..479a7db 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/InputContext.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/InputContext.java @@ -46,4 +46,11 @@ public interface InputContext extends TaskContext { * This method can be invoked multiple times. */ public void inputIsReady(); + + /** + * Get an {@link InputStatisticsReporter} for this {@link Input} that can + * be used to report statistics like data size + * @return {@link InputStatisticsReporter} + */ + public InputStatisticsReporter getStatisticsReporter(); } http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-api/src/main/java/org/apache/tez/runtime/api/InputStatistics.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/InputStatistics.java b/tez-api/src/main/java/org/apache/tez/runtime/api/InputStatistics.java new file mode 100644 index 0000000..fb99f2d --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/InputStatistics.java @@ -0,0 +1,40 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.runtime.api; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.tez.dag.api.Vertex; + +/** + * Provides various statistics about physical execution activity happening on a + * logical input in a {@link Vertex}. Inputs can be external inputs or inputs + * from other vertices. + */ +@Public +@Evolving +public interface InputStatistics { + + /** + * Returns the data size associated with this logical input + * <br>It is the size of the data read from this input by the vertex. + * @return Data size in bytes + */ + public long getDataSize(); +} http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-api/src/main/java/org/apache/tez/runtime/api/InputStatisticsReporter.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/InputStatisticsReporter.java b/tez-api/src/main/java/org/apache/tez/runtime/api/InputStatisticsReporter.java new file mode 100644 index 0000000..68a56e7 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/InputStatisticsReporter.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.api; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +/** + * Report statistics about the {@link Input} + */ +@Public +@Evolving +public interface InputStatisticsReporter { + + /** + * Report the size of the logical data read + * @param size of data in bytes + */ + public void reportDataSize(long size); + +} http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-api/src/main/java/org/apache/tez/runtime/api/OutputContext.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputContext.java index a0c7194..882eb4b 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputContext.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputContext.java @@ -40,5 +40,12 @@ public interface OutputContext extends TaskContext { * @return index */ public int getOutputIndex(); + + /** + * Get an {@link OutputStatisticsReporter} for this {@link Output} that can + * be used to report statistics like data size + * @return {@link OutputStatisticsReporter} + */ + public OutputStatisticsReporter getStatisticsReporter(); } http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-api/src/main/java/org/apache/tez/runtime/api/OutputStatistics.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputStatistics.java b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputStatistics.java new file mode 100644 index 0000000..0373606 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputStatistics.java @@ -0,0 +1,41 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.runtime.api; + + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.tez.dag.api.Vertex; + +/** + * Provides various statistics about physical execution activity happening on a + * logical output in a {@link Vertex}. Outputs can be external outputs or + * outputs to other vertices. + */ +@Public +@Evolving +public interface OutputStatistics { + + /** + * Returns the data size associated with this logical output + * <br>It is the size of the data written to this output by the vertex. + * @return Data size in bytes + */ + public long getDataSize(); +} http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-api/src/main/java/org/apache/tez/runtime/api/OutputStatisticsReporter.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputStatisticsReporter.java b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputStatisticsReporter.java new file mode 100644 index 0000000..fc9f1b7 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputStatisticsReporter.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.api; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +/** + * Report statistics about the {@link Output} + */ +@Public +@Evolving +public interface OutputStatisticsReporter { + + /** + * Report the size of the logical data written + * @param size of the data in bytes + */ + public void reportDataSize(long size); + +} http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-api/src/main/java/org/apache/tez/runtime/api/VertexStatistics.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/VertexStatistics.java b/tez-api/src/main/java/org/apache/tez/runtime/api/VertexStatistics.java new file mode 100644 index 0000000..aa526f2 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/VertexStatistics.java @@ -0,0 +1,58 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.runtime.api; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.tez.dag.api.Edge; +import org.apache.tez.dag.api.Vertex; + +/** + * Provides various statistics about the physical execution of this + * {@link Vertex}<br> + * This only provides point in time values for the statistics and values are + * refreshed based on when the implementations of the inputs/outputs/tasks etc. + * update their reported statistics. The values may increase or decrease based + * on task completions or failures. + */ +@Public +@Evolving +public interface VertexStatistics { + + /** + * Get statistics about an {@link Edge} input or external input of this + * {@link Vertex}. <br> + * + * @param inputName + * Name of the input {@link Edge} or external input of this vertex + * @return {@link InputStatistics} for the given input + */ + public InputStatistics getInputStatistics(String inputName); + + /** + * Get statistics about an {@link Edge} output or external output of this + * {@link Vertex}. <br> + * + * @param outputName + * Name of the output {@link Edge} or external output of this vertex + * @return {@link OutputStatistics} for the given output + */ + public OutputStatistics getOutputStatistics(String outputName); + +} http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java index 98a85cf..b798fce 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java @@ -41,7 +41,6 @@ public interface Task { Map<TezTaskAttemptID, TaskAttempt> getAttempts(); TaskAttempt getAttempt(TezTaskAttemptID attemptID); TaskAttempt getSuccessfulAttempt(); - /** Has Task reached the final state or not. */ boolean isFinished(); http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java index 44df6cb..6c85b85 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java @@ -18,6 +18,7 @@ package org.apache.tez.dag.app.dag; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -52,6 +53,7 @@ import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.runtime.api.OutputCommitter; import org.apache.tez.runtime.api.InputSpecUpdate; +import org.apache.tez.runtime.api.VertexStatistics; import org.apache.tez.runtime.api.impl.GroupInputSpec; import org.apache.tez.runtime.api.impl.InputSpec; import org.apache.tez.runtime.api.impl.OutputSpec; @@ -66,6 +68,7 @@ public interface Vertex extends Comparable<Vertex> { public VertexPlan getVertexPlan(); int getDistanceFromRoot(); + LinkedHashMap<String, Integer> getIOIndices(); String getName(); VertexState getState(); @@ -107,6 +110,8 @@ public interface Vertex extends Comparable<Vertex> { void setInputVertices(Map<Vertex, Edge> inVertices); void setOutputVertices(Map<Vertex, Edge> outVertices); + VertexStatistics getStatistics(); + Map<Vertex, Edge> getInputVertices(); Map<Vertex, Edge> getOutputVertices(); http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index 4f92fa6..1af4274 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -103,6 +103,7 @@ import org.apache.tez.runtime.api.events.InputReadErrorEvent; import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; import org.apache.tez.runtime.api.impl.EventMetaData; import org.apache.tez.runtime.api.impl.TaskSpec; +import org.apache.tez.runtime.api.impl.TaskStatistics; import org.apache.tez.runtime.api.impl.TezEvent; import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType; @@ -147,6 +148,8 @@ public class TaskAttemptImpl implements TaskAttempt, @VisibleForTesting TaskAttemptStatus reportedStatus; private DAGCounter localityCounter; + + org.apache.tez.runtime.api.impl.TaskStatistics statistics; // Used to store locality information when Set<String> taskHosts = new HashSet<String>(); @@ -517,6 +520,10 @@ public class TaskAttemptImpl implements TaskAttempt, readLock.unlock(); } } + + TaskStatistics getStatistics() { + return this.statistics; + } @Override public float getProgress() { @@ -1259,6 +1266,7 @@ public class TaskAttemptImpl implements TaskAttempt, ta.reportedStatus.state = ta.getState(); ta.reportedStatus.progress = statusEvent.getProgress(); ta.reportedStatus.counters = statusEvent.getCounters(); + ta.statistics = statusEvent.getStatistics(); ta.updateProgressSplits(); http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index 8290cf5..10a688f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -92,6 +92,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.runtime.api.OutputCommitter; +import org.apache.tez.runtime.api.impl.TaskStatistics; import org.apache.tez.runtime.api.impl.TezEvent; import com.google.common.annotations.VisibleForTesting; @@ -140,7 +141,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { private boolean historyTaskStartGenerated = false; - private static final SingleArcTransition<TaskImpl, TaskEvent> + private static final SingleArcTransition<TaskImpl , TaskEvent> ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition(); private static final SingleArcTransition<TaskImpl, TaskEvent> KILL_TRANSITION = new KillTransition(); @@ -149,7 +150,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { boolean recoveryStartEventSeen = false; private static final TaskStateChangedCallback STATE_CHANGED_CALLBACK = new TaskStateChangedCallback(); - + private static final StateMachineFactory <TaskImpl, TaskStateInternal, TaskEventType, TaskEvent> stateMachineFactory @@ -349,7 +350,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { stateMachineFactory.make(this), this); augmentStateMachine(); } - + @Override public Map<TezTaskAttemptID, TaskAttempt> getAttempts() { readLock.lock(); @@ -433,6 +434,20 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { readLock.unlock(); } } + + TaskStatistics getStatistics() { + // simply return the stats from the best attempt + readLock.lock(); + try { + TaskAttemptImpl bestAttempt = (TaskAttemptImpl) selectBestAttempt(); + if (bestAttempt == null) { + return null; + } + return bestAttempt.getStatistics(); + } finally { + readLock.unlock(); + } + } @Override public float getProgress() { http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 2d892b0..82f7a33 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -150,9 +150,12 @@ import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.dag.utils.TaskSpecificLaunchCmdOption; +import org.apache.tez.runtime.api.InputStatistics; import org.apache.tez.runtime.api.OutputCommitter; import org.apache.tez.runtime.api.OutputCommitterContext; import org.apache.tez.runtime.api.InputSpecUpdate; +import org.apache.tez.runtime.api.OutputStatistics; +import org.apache.tez.runtime.api.VertexStatistics; import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.api.events.InputFailedEvent; @@ -166,6 +169,7 @@ import org.apache.tez.runtime.api.impl.EventType; import org.apache.tez.runtime.api.impl.GroupInputSpec; import org.apache.tez.runtime.api.impl.InputSpec; import org.apache.tez.runtime.api.impl.OutputSpec; +import org.apache.tez.runtime.api.impl.TaskStatistics; import org.apache.tez.runtime.api.impl.TezEvent; import com.google.common.annotations.VisibleForTesting; @@ -660,6 +664,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, Map<Vertex, Edge> sourceVertices; private Map<Vertex, Edge> targetVertices; Set<Edge> uninitializedEdges = Sets.newHashSet(); + // using a linked hash map to conveniently map edge names to a contiguous index + LinkedHashMap<String, Integer> ioIndices = Maps.newLinkedHashMap(); private Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> rootInputDescriptors; @@ -718,6 +724,62 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, private final TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOpts; + private VertexStatisticsImpl finalStatistics; + + + static class IOStatisticsImpl extends org.apache.tez.runtime.api.impl.IOStatistics + implements InputStatistics, OutputStatistics { + + @Override + public long getDataSize() { + return super.getDataSize(); + } + + void mergeFrom(org.apache.tez.runtime.api.impl.IOStatistics other) { + this.setDataSize(this.getDataSize() + other.getDataSize()); + } + + } + + class VertexStatisticsImpl implements VertexStatistics { + final Map<String, IOStatisticsImpl> ioStats; + + public VertexStatisticsImpl() { + ioStats = Maps.newHashMapWithExpectedSize(ioIndices.size()); + for (String name : ioIndices.keySet()) { + ioStats.put(name, new IOStatisticsImpl()); + } + } + + public IOStatisticsImpl getIOStatistics(String ioName) { + return ioStats.get(ioName); + } + + void mergeFrom(TaskStatistics taskStats) { + if (taskStats == null) { + return; + } + + for (Map.Entry<String, org.apache.tez.runtime.api.impl.IOStatistics> entry : taskStats.getIOStatistics().entrySet()) { + String edgeName = entry.getKey(); + IOStatisticsImpl myEdgeStat = ioStats.get(edgeName); + Preconditions.checkState(myEdgeStat != null, "Unexpected IO name: " + edgeName + + " for vertex:" + getLogIdentifier()); + myEdgeStat.mergeFrom(entry.getValue()); + } + } + + @Override + public InputStatistics getInputStatistics(String inputName) { + return getIOStatistics(inputName); + } + + @Override + public OutputStatistics getOutputStatistics(String outputName) { + return getIOStatistics(outputName); + } + } + public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan, String vertexName, Configuration dagConf, EventHandler eventHandler, TaskAttemptListener taskAttemptListener, Clock clock, @@ -855,6 +917,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, public int getDistanceFromRoot() { return distanceFromRoot; } + + @Override + public LinkedHashMap<String, Integer> getIOIndices() { + return ioIndices; + } @Override public String getName() { @@ -931,9 +998,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, readLock.lock(); try { - VertexState state = getInternalState(); - if (state == VertexState.ERROR || state == VertexState.FAILED - || state == VertexState.KILLED || state == VertexState.SUCCEEDED) { + if (inTerminalState()) { this.mayBeConstructFinalFullCounters(); return fullCounters; } @@ -950,9 +1015,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, readLock.lock(); try { - VertexState state = getInternalState(); - if (state == VertexState.ERROR || state == VertexState.FAILED - || state == VertexState.KILLED || state == VertexState.SUCCEEDED) { + if (inTerminalState()) { this.mayBeConstructFinalFullCounters(); return this.vertexStats; } @@ -965,6 +1028,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } } + boolean inTerminalState() { + VertexState state = getInternalState(); + if (state == VertexState.ERROR || state == VertexState.FAILED + || state == VertexState.KILLED || state == VertexState.SUCCEEDED) { + return true; + } + return false; + } public static TezCounters incrTaskCounters( TezCounters counters, Collection<Task> tasks) { @@ -1705,6 +1776,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } if (vertex.completedTaskCount == vertex.tasks.size()) { + // finished - gather stats + vertex.finalStatistics = vertex.constructStatistics(); + //Only succeed if tasks complete successfully and no terminationCause is registered. if(vertex.succeededTaskCount == vertex.tasks.size() && vertex.terminationCause == null) { LOG.info("Vertex succeeded: " + vertex.logIdentifier); @@ -2221,7 +2295,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, return VertexState.FAILED; } } - + checkTaskLimits(); return VertexState.INITED; } @@ -3403,6 +3477,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, private void mayBeConstructFinalFullCounters() { // Calculating full-counters. This should happen only once for the vertex. synchronized (this.fullCountersLock) { + // TODO this is broken after rerun if (this.fullCounters != null) { // Already constructed. Just return. return; @@ -3410,6 +3485,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, this.constructFinalFullcounters(); } } + + private VertexStatisticsImpl constructStatistics() { + VertexStatisticsImpl stats = new VertexStatisticsImpl(); + for (Task t : this.tasks.values()) { + TaskStatistics taskStats = ((TaskImpl)t).getStatistics(); + stats.mergeFrom(taskStats); + } + + return stats; + } @Private public void constructFinalFullcounters() { @@ -3710,6 +3795,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, (new TaskRescheduledTransition()).transition(vertex, event); // inform the DAG that we are re-running vertex.eventHandler.handle(new DAGEventVertexReRunning(vertex.getVertexId())); + // back to running. so reset final cached stats + vertex.finalStatistics = null; return VertexState.RUNNING; } @@ -4049,18 +4136,24 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, @Override public void setInputVertices(Map<Vertex, Edge> inVertices) { this.sourceVertices = inVertices; + for (Vertex vertex : sourceVertices.keySet()) { + addIO(vertex.getName()); + } } @Override public void setOutputVertices(Map<Vertex, Edge> outVertices) { this.targetVertices = outVertices; + for (Vertex vertex : targetVertices.keySet()) { + addIO(vertex.getName()); + } } @Override public void setAdditionalInputs(List<RootInputLeafOutputProto> inputs) { this.rootInputDescriptors = Maps.newHashMapWithExpectedSize(inputs.size()); for (RootInputLeafOutputProto input : inputs) { - + addIO(input.getName()); InputDescriptor id = DagTypeConverters .convertInputDescriptorFromDAGPlan(input.getIODescriptor()); @@ -4106,6 +4199,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, this.additionalOutputs = Maps.newHashMapWithExpectedSize(outputs.size()); this.outputCommitters = Maps.newHashMapWithExpectedSize(outputs.size()); for (RootInputLeafOutputProto output : outputs) { + addIO(output.getName()); OutputDescriptor od = DagTypeConverters .convertOutputDescriptorFromDAGPlan(output.getIODescriptor()); @@ -4181,6 +4275,20 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, public Map<Vertex, Edge> getOutputVertices() { return Collections.unmodifiableMap(this.targetVertices); } + + @Override + public VertexStatistics getStatistics() { + readLock.lock(); + try { + if (inTerminalState()) { + Preconditions.checkState(this.finalStatistics != null); + return this.finalStatistics; + } + return constructStatistics(); + } finally { + readLock.unlock(); + } + } @Override public int getInputVerticesCount() { @@ -4214,6 +4322,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, readLock.unlock(); } } + + void addIO(String name) { + ioIndices.put(StringInterner.weakIntern(name), ioIndices.size()); + } @VisibleForTesting String getProcessorName() { http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java index 4bf51a1..0be0aaa 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java @@ -67,6 +67,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.InputSpecUpdate; +import org.apache.tez.runtime.api.VertexStatistics; import org.apache.tez.runtime.api.events.InputDataInformationEvent; import org.apache.tez.runtime.api.impl.EventMetaData; import org.apache.tez.runtime.api.impl.TezEvent; @@ -122,7 +123,6 @@ public class VertexManager { @Override public synchronized Map<String, EdgeProperty> getInputVertexEdgeProperties() { checkAndThrowIfDone(); - // TODO Something similar for Initial Inputs - payload etc visible Map<Vertex, Edge> inputs = managedVertex.getInputVertices(); Map<String, EdgeProperty> vertexEdgeMap = Maps.newHashMapWithExpectedSize(inputs.size()); @@ -133,6 +133,24 @@ public class VertexManager { } @Override + public synchronized Map<String, EdgeProperty> getOutputVertexEdgeProperties() { + checkAndThrowIfDone(); + Map<Vertex, Edge> outputs = managedVertex.getOutputVertices(); + Map<String, EdgeProperty> vertexEdgeMap = + Maps.newHashMapWithExpectedSize(outputs.size()); + for (Map.Entry<Vertex, Edge> entry : outputs.entrySet()) { + vertexEdgeMap.put(entry.getKey().getName(), entry.getValue().getEdgeProperty()); + } + return vertexEdgeMap; + } + + @Override + public synchronized VertexStatistics getVertexStatistics(String vertexName) { + checkAndThrowIfDone(); + return appContext.getCurrentDAG().getVertex(vertexName).getStatistics(); + } + + @Override public synchronized String getVertexName() { checkAndThrowIfDone(); return managedVertex.getName(); http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java index 3390b02..fca15fd 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java @@ -67,6 +67,7 @@ import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; import org.apache.tez.runtime.api.impl.EventMetaData; import org.apache.tez.runtime.api.impl.OutputSpec; import org.apache.tez.runtime.api.impl.TaskSpec; +import org.apache.tez.runtime.api.impl.TaskStatistics; import org.apache.tez.runtime.api.impl.TezEvent; import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType; import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; @@ -91,6 +92,7 @@ public class MockDAGAppMaster extends DAGAppMaster { boolean startFailFlag; boolean sendDMEvents; CountersDelegate countersDelegate; + StatisticsDelegate statsDelegate; long launcherSleepTime = 1; boolean doSleep = true; int handlerConcurrency = 1; @@ -101,6 +103,9 @@ public class MockDAGAppMaster extends DAGAppMaster { AtomicLong heartbeatTime = new AtomicLong(0); AtomicLong numHearbeats = new AtomicLong(0); + public static interface StatisticsDelegate { + public TaskStatistics getStatistics(TaskSpec taskSpec); + } public static interface CountersDelegate { public TezCounters getCounters(TaskSpec taskSpec); } @@ -402,10 +407,14 @@ public class MockDAGAppMaster extends DAGAppMaster { if (countersDelegate != null) { counters = countersDelegate.getCounters(cData.taskSpec); } + TaskStatistics stats = null; + if (statsDelegate != null) { + stats = statsDelegate.getStatistics(cData.taskSpec); + } cData.numUpdates++; float maxUpdates = (updatesToMake != null) ? updatesToMake.intValue() : 1; float progress = updateProgress ? cData.numUpdates/maxUpdates : 0f; - events.add(new TezEvent(new TaskStatusUpdateEvent(counters, progress), new EventMetaData( + events.add(new TezEvent(new TaskStatusUpdateEvent(counters, progress, stats), new EventMetaData( EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId))); TezHeartbeatRequest request = new TezHeartbeatRequest(cData.numUpdates, events, cData.cIdStr, cData.taId, cData.nextFromEventId, 10000); http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java index 1b8acab..8be60c5 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java @@ -43,10 +43,12 @@ import org.apache.tez.common.counters.CounterGroup; import org.apache.tez.common.counters.DAGCounter; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.DataSourceDescriptor; import org.apache.tez.dag.api.Edge; import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.EdgeProperty.DataSourceType; import org.apache.tez.dag.api.EdgeProperty.SchedulingType; +import org.apache.tez.dag.api.DataSinkDescriptor; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; @@ -59,7 +61,9 @@ import org.apache.tez.dag.api.oldrecords.TaskAttemptState; import org.apache.tez.dag.app.MockDAGAppMaster.CountersDelegate; import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher; import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher.ContainerData; +import org.apache.tez.dag.app.MockDAGAppMaster.StatisticsDelegate; import org.apache.tez.dag.app.dag.DAGState; +import org.apache.tez.dag.app.dag.Task; import org.apache.tez.dag.app.dag.TaskAttempt; import org.apache.tez.dag.app.dag.event.DAGAppMasterEventSchedulingServiceError; import org.apache.tez.dag.app.dag.impl.DAGImpl; @@ -68,10 +72,13 @@ import org.apache.tez.dag.app.dag.impl.VertexImpl; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.runtime.api.VertexStatistics; import org.apache.tez.runtime.api.events.DataMovementEvent; +import org.apache.tez.runtime.api.impl.IOStatistics; import org.apache.tez.runtime.api.impl.InputSpec; import org.apache.tez.runtime.api.impl.OutputSpec; import org.apache.tez.runtime.api.impl.TaskSpec; +import org.apache.tez.runtime.api.impl.TaskStatistics; import org.apache.tez.runtime.api.impl.TezEvent; import org.junit.Assert; import org.junit.Ignore; @@ -332,6 +339,91 @@ public class TestMockDAGAppMaster { tezClient.stop(); } + @Test (timeout = 10000) + public void testBasicStatistics() throws Exception { + TezConfiguration tezconf = new TezConfiguration(defaultConf); + MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, + null, false, false); + tezClient.start(); + + final String vAName = "A"; + final String vBName = "B"; + final String sourceName = "In"; + final String sinkName = "Out"; + DAG dag = DAG.create("testBasisStatistics"); + Vertex vA = Vertex.create(vAName, ProcessorDescriptor.create("Proc.class"), 3); + Vertex vB = Vertex.create(vBName, ProcessorDescriptor.create("Proc.class"), 2); + vA.addDataSource(sourceName, + DataSourceDescriptor.create(InputDescriptor.create("In"), null, null)); + vB.addDataSink(sinkName, DataSinkDescriptor.create(OutputDescriptor.create("Out"), null, null)); + dag.addVertex(vA) + .addVertex(vB) + .addEdge( + Edge.create(vA, vB, EdgeProperty.create(DataMovementType.SCATTER_GATHER, + DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, + OutputDescriptor.create("Out"), InputDescriptor.create("In")))); + IOStatistics ioStats = new IOStatistics(); + ioStats.setDataSize(1); + TaskStatistics vAStats = new TaskStatistics(); + vAStats.addIO(vBName, ioStats); + vAStats.addIO(sourceName, ioStats); + TaskStatistics vBStats = new TaskStatistics(); + vBStats.addIO(vAName, ioStats); + vBStats.addIO(sinkName, ioStats); + ByteArrayOutputStream bosA = new ByteArrayOutputStream(); + DataOutput outA = new DataOutputStream(bosA); + vAStats.write(outA); + final byte[] payloadA = bosA.toByteArray(); + ByteArrayOutputStream bosB = new ByteArrayOutputStream(); + DataOutput outB = new DataOutputStream(bosB); + vBStats.write(outB); + final byte[] payloadB = bosB.toByteArray(); + + MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp(); + MockContainerLauncher mockLauncher = mockApp.getContainerLauncher(); + mockLauncher.startScheduling(false); + mockApp.statsDelegate = new StatisticsDelegate() { + @Override + public TaskStatistics getStatistics(TaskSpec taskSpec) { + byte[] payload = payloadA; + TaskStatistics stats = new TaskStatistics(); + if (taskSpec.getVertexName().equals(vBName)) { + payload = payloadB; + } + final DataInputByteBuffer in = new DataInputByteBuffer(); + in.reset(ByteBuffer.wrap(payload)); + try { + // this ensures that the serde code path is covered. + stats.readFields(in); + } catch (IOException e) { + Assert.fail(e.getMessage()); + } + return stats; + } + }; + mockApp.doSleep = false; + DAGClient dagClient = tezClient.submitDAG(dag); + mockLauncher.waitTillContainersLaunched(); + DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG(); + mockLauncher.startScheduling(true); + DAGStatus status = dagClient.waitForCompletion(); + Assert.assertEquals(DAGStatus.State.SUCCEEDED, status.getState()); + + // verify that the values have been correct aggregated + for (org.apache.tez.dag.app.dag.Vertex v : dagImpl.getVertices().values()) { + VertexStatistics vStats = v.getStatistics(); + if (v.getName().equals(vAName)) { + Assert.assertEquals(3, vStats.getOutputStatistics(vBName).getDataSize()); + Assert.assertEquals(3, vStats.getInputStatistics(sourceName).getDataSize()); + } else { + Assert.assertEquals(2, vStats.getInputStatistics(vAName).getDataSize()); + Assert.assertEquals(2, vStats.getOutputStatistics(sinkName).getDataSize()); + } + } + + tezClient.stop(); + } + private void checkMemory(String name, MockDAGAppMaster mockApp) { long mb = 1024*1024; @@ -401,6 +493,70 @@ public class TestMockDAGAppMaster { checkMemory(dag.getName(), mockApp); tezClient.stop(); } + + @Ignore + @Test (timeout = 60000) + public void testBasicStatisticsMemory() throws Exception { + Logger.getRootLogger().setLevel(Level.WARN); + TezConfiguration tezconf = new TezConfiguration(defaultConf); + MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, + null, false, false); + tezClient.start(); + + final String vAName = "abcdefghijklmnopqrstuvwxyz"; + int numTasks = 10000; + int numSources = 10; + + IOStatistics ioStats = new IOStatistics(); + ioStats.setDataSize(1); + TaskStatistics vAStats = new TaskStatistics(); + + DAG dag = DAG.create("testBasisStatistics"); + Vertex vA = Vertex.create(vAName, ProcessorDescriptor.create("Proc.class"), numTasks); + for (int i=0; i<numSources; ++i) { + final String sourceName = i + vAName; + vA.addDataSource(sourceName, + DataSourceDescriptor.create(InputDescriptor.create(sourceName), null, null)); + vAStats.addIO(sourceName, ioStats); + } + dag.addVertex(vA); + + ByteArrayOutputStream bosA = new ByteArrayOutputStream(); + DataOutput outA = new DataOutputStream(bosA); + vAStats.write(outA); + final byte[] payloadA = bosA.toByteArray(); + + MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp(); + MockContainerLauncher mockLauncher = mockApp.getContainerLauncher(); + mockLauncher.startScheduling(false); + mockApp.statsDelegate = new StatisticsDelegate() { + @Override + public TaskStatistics getStatistics(TaskSpec taskSpec) { + byte[] payload = payloadA; + TaskStatistics stats = new TaskStatistics(); + final DataInputByteBuffer in = new DataInputByteBuffer(); + in.reset(ByteBuffer.wrap(payload)); + try { + // this ensures that the serde code path is covered. + stats.readFields(in); + } catch (IOException e) { + Assert.fail(e.getMessage()); + } + return stats; + } + }; + mockApp.doSleep = false; + DAGClient dagClient = tezClient.submitDAG(dag); + mockLauncher.waitTillContainersLaunched(); + DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG(); + mockLauncher.startScheduling(true); + DAGStatus status = dagClient.waitForCompletion(); + Assert.assertEquals(DAGStatus.State.SUCCEEDED, status.getState()); + Assert.assertEquals(numTasks, + dagImpl.getVertex(vAName).getStatistics().getInputStatistics(0+vAName).getDataSize()); + checkMemory(dag.getName(), mockApp); + tezClient.stop(); + } @Test (timeout = 10000) public void testMultipleSubmissions() throws Exception { http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java index 224976c..2a2df7c 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -630,7 +630,7 @@ public class TestTaskAttempt { arg.getAllValues().subList(0, expectedEventsAtRunning), SpeculatorEventTaskAttemptStatusUpdate.class, 1); - taImpl.handle(new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f))); + taImpl.handle(new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null))); taImpl.handle(new TaskAttemptEventAttemptFailed(taskAttemptID, TaskAttemptEventType.TA_FAILED, "0", TaskAttemptTerminationCause.APPLICATION_ERROR)); @@ -732,7 +732,7 @@ public class TestTaskAttempt { arg.getAllValues().subList(0, expectedEventsAtRunning), SpeculatorEventTaskAttemptStatusUpdate.class, 1); - taImpl.handle(new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f))); + taImpl.handle(new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null))); taImpl.handle(new TaskAttemptEvent(taskAttemptID, TaskAttemptEventType.TA_DONE)); http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java index 56b2627..0b048da 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java @@ -27,7 +27,6 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; @@ -45,7 +44,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.tez.common.CallableWithNdc; import org.apache.tez.common.ReflectionUtils; import org.apache.tez.common.RunnableWithNdc; -import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; @@ -75,6 +73,7 @@ import org.apache.tez.runtime.api.impl.GroupInputSpec; import org.apache.tez.runtime.api.impl.InputSpec; import org.apache.tez.runtime.api.impl.OutputSpec; import org.apache.tez.runtime.api.impl.TaskSpec; +import org.apache.tez.runtime.api.impl.TaskStatistics; import org.apache.tez.runtime.api.impl.TezEvent; import org.apache.tez.runtime.api.impl.TezInputContextImpl; import org.apache.tez.runtime.api.impl.TezMergedInputContextImpl; @@ -86,7 +85,6 @@ import org.apache.tez.runtime.common.resources.MemoryDistributor; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -119,6 +117,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { /** Maps which will be provided to the processor run method */ private final LinkedHashMap<String, LogicalInput> runInputMap; private final LinkedHashMap<String, LogicalOutput> runOutputMap; + + private final TaskStatistics statistics; private final Map<String, ByteBuffer> serviceConsumerMetadata; private final Map<String, String> envMap; @@ -182,6 +182,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { this.objectRegistry = objectRegistry; this.ExecutionContext = ExecutionContext; this.memAvailable = memAvailable; + this.statistics = new TaskStatistics(); } /** @@ -324,6 +325,10 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { startRouterThread(); } + + public TaskStatistics getTaskStatistics() { + return statistics; + } public void run() throws Exception { Preconditions.checkState(this.state.get() == State.INITED, http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java index 47c2998..875a345 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java @@ -25,18 +25,21 @@ import java.io.IOException; import org.apache.hadoop.io.Writable; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.impl.TaskStatistics; public class TaskStatusUpdateEvent extends Event implements Writable { private TezCounters tezCounters; private float progress; + private TaskStatistics statistics; public TaskStatusUpdateEvent() { } - public TaskStatusUpdateEvent(TezCounters tezCounters, float progress) { + public TaskStatusUpdateEvent(TezCounters tezCounters, float progress, TaskStatistics statistics) { this.tezCounters = tezCounters; this.progress = progress; + this.statistics = statistics; } public TezCounters getCounters() { @@ -46,6 +49,10 @@ public class TaskStatusUpdateEvent extends Event implements Writable { public float getProgress() { return progress; } + + public TaskStatistics getStatistics() { + return statistics; + } @Override public void write(DataOutput out) throws IOException { @@ -56,6 +63,10 @@ public class TaskStatusUpdateEvent extends Event implements Writable { } else { out.writeBoolean(false); } + if (statistics != null) { + out.writeBoolean(true); + statistics.write(out); + } } @Override @@ -65,6 +76,10 @@ public class TaskStatusUpdateEvent extends Event implements Writable { tezCounters = new TezCounters(); tezCounters.readFields(in); } + if (in.readBoolean()) { + statistics = new TaskStatistics(); + statistics.readFields(in); + } } } http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/IOStatistics.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/IOStatistics.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/IOStatistics.java new file mode 100644 index 0000000..ede9205 --- /dev/null +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/IOStatistics.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.tez.runtime.api.impl; + +public class IOStatistics { + private long dataSize = 0; + + public void setDataSize(long size) { + this.dataSize = size; + } + + public long getDataSize() { + return dataSize; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskStatistics.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskStatistics.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskStatistics.java new file mode 100644 index 0000000..b50d099 --- /dev/null +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskStatistics.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.tez.runtime.api.impl; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Map; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.StringInterner; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; + +public class TaskStatistics implements Writable { + // The memory usage of this is minimal (<10MB for 10K tasks x 10 inputs) + // TestMockDAGAppMaster#testBasicStatisticsMemory + private Map<String, IOStatistics> ioStatistics = Maps.newConcurrentMap(); + + public void addIO(String edgeName) { + addIO(edgeName, new IOStatistics()); + } + + public void addIO(String edgeName, IOStatistics stats) { + Preconditions.checkArgument(stats != null, edgeName); + ioStatistics.put(StringInterner.weakIntern(edgeName), stats); + } + + public Map<String, IOStatistics> getIOStatistics() { + return ioStatistics; + } + + @Override + public void write(DataOutput out) throws IOException { + int numEntries = ioStatistics.size(); + out.writeInt(numEntries); + for (Map.Entry<String, IOStatistics> entry : ioStatistics.entrySet()) { + IOStatistics edgeStats = entry.getValue(); + Text.writeString(out, entry.getKey()); + out.writeLong(edgeStats.getDataSize()); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + int numEntries = in.readInt(); + for (int i=0; i<numEntries; ++i) { + String edgeName = Text.readString(in); + IOStatistics edgeStats = new IOStatistics(); + edgeStats.setDataSize(in.readLong()); + addIO(edgeName, edgeStats); + } + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java index bd41aed..a9b7eab 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java @@ -37,9 +37,10 @@ import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.InputReadyTracker; -import org.apache.tez.runtime.RuntimeTask; +import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.InputStatisticsReporter; import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.api.ObjectRegistry; @@ -55,6 +56,18 @@ public class TezInputContextImpl extends TezTaskContextImpl private final int inputIndex; private final Map<String, LogicalInput> inputs; private final InputReadyTracker inputReadyTracker; + private final InputStatisticsReporterImpl statsReporter; + + class InputStatisticsReporterImpl implements InputStatisticsReporter { + + @Override + public synchronized void reportDataSize(long size) { + // this is a concurrent map. Plus we are not adding/deleting entries + runtimeTask.getTaskStatistics().getIOStatistics().get(sourceVertexName) + .setDataSize(size); + } + + } @Private public TezInputContextImpl(Configuration conf, String[] workDirs, @@ -63,7 +76,7 @@ public class TezInputContextImpl extends TezTaskContextImpl String taskVertexName, String sourceVertexName, int vertexParallelism, TezTaskAttemptID taskAttemptID, int inputIndex, @Nullable UserPayload userPayload, - RuntimeTask runtimeTask, + LogicalIOProcessorRuntimeTask runtimeTask, Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> auxServiceEnv, MemoryDistributor memDist, InputDescriptor inputDescriptor, Map<String, LogicalInput> inputs, @@ -86,9 +99,11 @@ public class TezInputContextImpl extends TezTaskContextImpl taskAttemptID); this.inputs = inputs; this.inputReadyTracker = inputReadyTracker; + runtimeTask.getTaskStatistics().addIO(sourceVertexName); + statsReporter = new InputStatisticsReporterImpl(); } - private static TezCounters wrapCounters(RuntimeTask task, String taskVertexName, + private static TezCounters wrapCounters(LogicalIOProcessorRuntimeTask task, String taskVertexName, String edgeVertexName, Configuration conf) { TezCounters tezCounters = task.addAndGetTezCounter(edgeVertexName); if (conf.getBoolean(TezConfiguration.TEZ_TASK_GENERATE_COUNTERS_PER_IO, @@ -134,4 +149,9 @@ public class TezInputContextImpl extends TezTaskContextImpl public void inputIsReady() { inputReadyTracker.setInputIsReady(inputs.get(sourceVertexName)); } -} \ No newline at end of file + + @Override + public InputStatisticsReporter getStatisticsReporter() { + return statsReporter; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java index 8d758f0..17513dd 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java @@ -36,11 +36,12 @@ import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.records.TezTaskAttemptID; -import org.apache.tez.runtime.RuntimeTask; +import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.ObjectRegistry; import org.apache.tez.runtime.api.OutputContext; +import org.apache.tez.runtime.api.OutputStatisticsReporter; import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType; import org.apache.tez.runtime.common.resources.MemoryDistributor; @@ -51,6 +52,18 @@ public class TezOutputContextImpl extends TezTaskContextImpl private final String destinationVertexName; private final EventMetaData sourceInfo; private final int outputIndex; + private final OutputStatisticsReporterImpl statsReporter; + + class OutputStatisticsReporterImpl implements OutputStatisticsReporter { + + @Override + public synchronized void reportDataSize(long size) { + // this is a concurrent map. Plus we are not adding/deleting entries + runtimeTask.getTaskStatistics().getIOStatistics().get(destinationVertexName) + .setDataSize(size); + } + + } @Private public TezOutputContextImpl(Configuration conf, String[] workDirs, int appAttemptNumber, @@ -59,7 +72,7 @@ public class TezOutputContextImpl extends TezTaskContextImpl String destinationVertexName, int vertexParallelism, TezTaskAttemptID taskAttemptID, int outputIndex, - @Nullable UserPayload userPayload, RuntimeTask runtimeTask, + @Nullable UserPayload userPayload, LogicalIOProcessorRuntimeTask runtimeTask, Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> auxServiceEnv, MemoryDistributor memDist, OutputDescriptor outputDescriptor, ObjectRegistry objectRegistry, @@ -76,9 +89,11 @@ public class TezOutputContextImpl extends TezTaskContextImpl this.destinationVertexName = destinationVertexName; this.sourceInfo = new EventMetaData(EventProducerConsumerType.OUTPUT, taskVertexName, destinationVertexName, taskAttemptID); + runtimeTask.getTaskStatistics().addIO(destinationVertexName); + statsReporter = new OutputStatisticsReporterImpl(); } - private static TezCounters wrapCounters(RuntimeTask runtimeTask, String taskVertexName, + private static TezCounters wrapCounters(LogicalIOProcessorRuntimeTask runtimeTask, String taskVertexName, String edgeVertexName, Configuration conf) { TezCounters tezCounters = runtimeTask.addAndGetTezCounter(edgeVertexName); if (conf.getBoolean(TezConfiguration.TEZ_TASK_GENERATE_COUNTERS_PER_IO, @@ -119,4 +134,9 @@ public class TezOutputContextImpl extends TezTaskContextImpl public int getOutputIndex() { return outputIndex; } + + @Override + public OutputStatisticsReporter getStatisticsReporter() { + return statsReporter; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java index edfd8c9..a74ccac 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java @@ -37,7 +37,7 @@ import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.InputReadyTracker; -import org.apache.tez.runtime.RuntimeTask; +import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.Input; @@ -55,7 +55,7 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements Proce public TezProcessorContextImpl(Configuration conf, String[] workDirs, int appAttemptNumber, TezUmbilical tezUmbilical, String dagName, String vertexName, int vertexParallelism, TezTaskAttemptID taskAttemptID, - @Nullable UserPayload userPayload, RuntimeTask runtimeTask, + @Nullable UserPayload userPayload, LogicalIOProcessorRuntimeTask runtimeTask, Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> auxServiceEnv, MemoryDistributor memDist, ProcessorDescriptor processorDescriptor, InputReadyTracker inputReadyTracker, ObjectRegistry objectRegistry, http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java index 527b822..6c0a869 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.EntityDescriptor; import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask; import org.apache.tez.runtime.RuntimeTask; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.MemoryUpdateCallback; @@ -53,7 +54,7 @@ public abstract class TezTaskContextImpl implements TaskContext { private final TezCounters counters; private String[] workDirs; private String uniqueIdentifier; - protected final RuntimeTask runtimeTask; + protected final LogicalIOProcessorRuntimeTask runtimeTask; protected final TezUmbilical tezUmbilical; private final Map<String, ByteBuffer> serviceConsumerMetadata; private final int appAttemptNumber; @@ -69,7 +70,7 @@ public abstract class TezTaskContextImpl implements TaskContext { @Private public TezTaskContextImpl(Configuration conf, String[] workDirs, int appAttemptNumber, String dagName, String taskVertexName, int vertexParallelism, - TezTaskAttemptID taskAttemptID, TezCounters counters, RuntimeTask runtimeTask, + TezTaskAttemptID taskAttemptID, TezCounters counters, LogicalIOProcessorRuntimeTask runtimeTask, TezUmbilical tezUmbilical, Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> auxServiceEnv, MemoryDistributor memDist, EntityDescriptor<?> descriptor, ObjectRegistry objectRegistry, http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java index 4c07f5a..48be8bd 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java @@ -223,7 +223,7 @@ public class TaskReporter { eventsToSend.drainTo(events); if (!task.isTaskDone() && !task.hadFatalError()) { - TezCounters counters = null; + boolean sendCounters = false; /** * Increasing the heartbeat interval can delay the delivery of events. Sending just updated * records would save CPU in DAG AM, but certain counters are updated very frequently. Until @@ -231,11 +231,10 @@ public class TaskReporter { */ // Not completely accurate, since OOB heartbeats could go out. if ((nonOobHeartbeatCounter.get() - prevCounterSendHeartbeatNum) * pollInterval >= sendCounterInterval) { - counters = task.getCounters(); + sendCounters = true; prevCounterSendHeartbeatNum = nonOobHeartbeatCounter.get(); } - updateEvent = new TezEvent(new TaskStatusUpdateEvent(counters, task.getProgress()), - updateEventMetadata); + updateEvent = new TezEvent(getStatusUpdateEvent(sendCounters), updateEventMetadata); events.add(updateEvent); } @@ -313,12 +312,16 @@ public class TaskReporter { * indicates an exception somewhere in the AM. */ private boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException, TezException { - TezEvent statusUpdateEvent = new TezEvent(new TaskStatusUpdateEvent(task.getCounters(), - task.getProgress()), updateEventMetadata); + TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), updateEventMetadata); TezEvent taskCompletedEvent = new TezEvent(new TaskAttemptCompletedEvent(), updateEventMetadata); return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskCompletedEvent)).shouldDie; } + + private TaskStatusUpdateEvent getStatusUpdateEvent(boolean sendCounters) { + return new TaskStatusUpdateEvent((sendCounters ? task.getCounters() : null), + task.getProgress(), task.getTaskStatistics()); + } /** * Sends out final events for task failure. @@ -334,8 +337,7 @@ public class TaskReporter { */ private boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics, EventMetaData srcMeta) throws IOException, TezException { - TezEvent statusUpdateEvent = new TezEvent(new TaskStatusUpdateEvent(task.getCounters(), - task.getProgress()), updateEventMetadata); + TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), updateEventMetadata); if (diagnostics == null) { diagnostics = ExceptionUtils.getStackTrace(t); } else { http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java index a807bad..d2d9ed8 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java @@ -185,6 +185,11 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput { if (shuffle != null) { shuffle.shutdown(); } + + long outputSize = getContext().getCounters() + .findCounter(TaskCounter.SHUFFLE_BYTES_DECOMPRESSED).getValue(); + getContext().getStatisticsReporter().reportDataSize(outputSize); + return Collections.emptyList(); } http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java index 6ad58ba..f2f47e1 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java @@ -208,6 +208,11 @@ public class UnorderedKVInput extends AbstractLogicalInput { if (this.shuffleManager != null) { this.shuffleManager.shutdown(); } + + long outputSize = getContext().getCounters() + .findCounter(TaskCounter.SHUFFLE_BYTES_DECOMPRESSED).getValue(); + getContext().getStatisticsReporter().reportDataSize(outputSize); + return null; } http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java index 518d214..b03c674 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java @@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -33,6 +34,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.conf.Configuration; import org.apache.tez.common.TezRuntimeFrameworkConfigs; import org.apache.tez.common.TezUtils; +import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.runtime.api.AbstractLogicalOutput; import org.apache.tez.runtime.api.Event; @@ -159,16 +161,22 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput { @Override public synchronized List<Event> close() throws IOException { + List<Event> returnEvents = null; if (sorter != null) { sorter.flush(); sorter.close(); this.endTime = System.nanoTime(); - return generateEvents(); + returnEvents = generateEvents(); } else { LOG.warn("Attempting to close output " + getContext().getDestinationVertexName() + " before it was started"); - return Collections.emptyList(); + returnEvents = Collections.emptyList(); } + + long outputSize = getContext().getCounters().findCounter(TaskCounter.OUTPUT_BYTES).getValue(); + getContext().getStatisticsReporter().reportDataSize(outputSize); + + return returnEvents; } private List<Event> generateEvents() throws IOException { http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java index 9914735..6c9077e 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java @@ -32,6 +32,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.conf.Configuration; import org.apache.tez.common.TezUtils; import org.apache.tez.common.TezRuntimeFrameworkConfigs; +import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.runtime.api.AbstractLogicalOutput; @@ -120,12 +121,18 @@ public class UnorderedKVOutput extends AbstractLogicalOutput { @Override public synchronized List<Event> close() throws Exception { + List<Event> returnEvents = null; if (isStarted.get()) { //TODO: Do we need to support sending payloads via events? - return kvWriter.close(); + returnEvents = kvWriter.close(); } else { - return Collections.emptyList(); + returnEvents = Collections.emptyList(); } + + long outputSize = getContext().getCounters().findCounter(TaskCounter.OUTPUT_BYTES).getValue(); + getContext().getStatisticsReporter().reportDataSize(outputSize); + + return returnEvents; } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java index af66c62..8b83f9b 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java @@ -33,6 +33,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.conf.Configuration; import org.apache.tez.common.TezUtils; import org.apache.tez.common.TezRuntimeFrameworkConfigs; +import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.runtime.api.AbstractLogicalOutput; import org.apache.tez.runtime.api.Event; @@ -97,11 +98,17 @@ public class UnorderedPartitionedKVOutput extends AbstractLogicalOutput { @Override public synchronized List<Event> close() throws Exception { + List<Event> returnEvents = null; if (isStarted.get()) { - return kvWriter.close(); + returnEvents = kvWriter.close(); } else { - return Collections.emptyList(); + returnEvents = Collections.emptyList(); } + + long outputSize = getContext().getCounters().findCounter(TaskCounter.OUTPUT_BYTES).getValue(); + getContext().getStatisticsReporter().reportDataSize(outputSize); + + return returnEvents; } private static final Set<String> confKeys = new HashSet<String>(); http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java index cfe1e6f..8e43f21 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java @@ -19,6 +19,7 @@ package org.apache.tez.runtime.library.output; import com.google.protobuf.ByteString; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -32,6 +33,7 @@ import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.MemoryUpdateCallback; import org.apache.tez.runtime.api.OutputContext; +import org.apache.tez.runtime.api.OutputStatisticsReporter; import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; import org.apache.tez.runtime.library.api.KeyValuesWriter; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; @@ -56,6 +58,7 @@ import java.util.Collection; import java.util.LinkedList; import java.util.List; import java.util.Random; +import java.util.concurrent.atomic.AtomicLong; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -82,6 +85,8 @@ public class TestOnFileSortedOutput { private int partitions; //For sorter (pipelined / Default) private int sorterThreads; + + final AtomicLong outputSize = new AtomicLong(); private KeyValuesWriter writer; private OrderedPartitionedKVOutput sortedOutput; @@ -123,6 +128,7 @@ public class TestOnFileSortedOutput { conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED, sendEmptyPartitionViaEvent); + outputSize.set(0); fs.mkdirs(workingDir); this.partitions = Math.max(1, rnd.nextInt(10)); } @@ -335,7 +341,15 @@ public class TestOnFileSortedOutput { serviceProviderMetaData.writeInt(PORT); TezCounters counters = new TezCounters(); - + + OutputStatisticsReporter reporter = mock(OutputStatisticsReporter.class); + doAnswer(new Answer() { + @Override public Object answer(InvocationOnMock invocation) throws Throwable { + outputSize.set((Long) invocation.getArguments()[0]); + return null; + } + }).when(reporter).reportDataSize(anyLong()); + OutputContext context = mock(OutputContext.class); doReturn(counters).when(context).getCounters(); doReturn(workingDirs).when(context).getWorkDirs(); @@ -357,6 +371,7 @@ public class TestOnFileSortedOutput { }).when(context).requestInitialMemory(anyLong(), any(MemoryUpdateCallback.class)); ExecutionContext ExecutionContext = mock(ExecutionContext.class); doReturn(HOST).when(ExecutionContext).getHostName(); + doReturn(reporter).when(context).getStatisticsReporter(); doReturn(ExecutionContext).when(context).getExecutionContext(); return context; }
