Repository: tez Updated Branches: refs/heads/master d5ac3b75f -> ec9135145
TEZ-3856. API to access counters in InputInitializerContext (Prasanth Jayachandran via zhiyuany) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ec913514 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ec913514 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ec913514 Branch: refs/heads/master Commit: ec9135145fda48917b319b1accc273254c707ae5 Parents: d5ac3b7 Author: Zhiyuan Yang <[email protected]> Authored: Wed Oct 25 23:43:46 2017 -0700 Committer: Zhiyuan Yang <[email protected]> Committed: Wed Oct 25 23:44:02 2017 -0700 ---------------------------------------------------------------------- .../apache/tez/runtime/api/InputInitializerContext.java | 7 +++++++ .../src/main/java/org/apache/tez/dag/app/dag/Vertex.java | 6 ++++++ .../app/dag/impl/TezRootInputInitializerContextImpl.java | 5 +++++ .../java/org/apache/tez/dag/app/dag/impl/VertexImpl.java | 11 ++++++++++- .../test/java/org/apache/tez/mapreduce/TezTestUtils.java | 6 ++++++ 5 files changed, 34 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/ec913514/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializerContext.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializerContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializerContext.java index 6a123cf..ccfac46 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializerContext.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializerContext.java @@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.event.VertexState; import org.apache.tez.dag.api.event.VertexStateUpdate; @@ -117,4 +118,10 @@ public interface InputInitializerContext { */ void registerForVertexStateUpdates(String vertexName, @Nullable Set<VertexState> stateSet); + /** + * Add custom counters + * + * @param tezCounters counters to add + */ + void addCounters(TezCounters tezCounters); } http://git-wip-us.apache.org/repos/asf/tez/blob/ec913514/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java index 4d0a4bf..ba7624c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java @@ -86,6 +86,12 @@ public interface Vertex extends Comparable<Vertex> { */ TezCounters getCachedCounters(); + /** + * Add custom counters to the vertex + * @param tezCounters counters to add + */ + void addCounters(TezCounters tezCounters); + int getMaxTaskConcurrency(); Map<TezTaskID, Task> getTasks(); Task getTask(TezTaskID taskID); http://git-wip-us.apache.org/repos/asf/tez/blob/ec913514/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java index 4ca4024..f713054 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java @@ -24,6 +24,7 @@ import java.util.Set; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.InputInitializerDescriptor; import org.apache.tez.dag.api.RootInputLeafOutput; @@ -119,4 +120,8 @@ public class TezRootInputInitializerContextImpl implements manager.registerForVertexUpdates(vertexName, input.getName(), stateSet); } + @Override + public void addCounters(final TezCounters tezCounters) { + vertex.addCounters(tezCounters); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/ec913514/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 209db5a..0bd73ee 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -231,6 +231,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl // must be a linked map for ordering volatile LinkedHashMap<TezTaskID, Task> tasks = new LinkedHashMap<TezTaskID, Task>(); private Object fullCountersLock = new Object(); + private TezCounters counters = new TezCounters(); private TezCounters fullCounters = null; private TezCounters cachedCounters = null; private long cachedCountersTimestamp = 0; @@ -1189,6 +1190,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl } TezCounters counters = new TezCounters(); + counters.incrAllCounters(this.counters); return incrTaskCounters(counters, tasks.values()); } finally { @@ -1217,13 +1219,19 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl } TezCounters counters = new TezCounters(); + counters.incrAllCounters(this.counters); cachedCounters = incrTaskCounters(counters, tasks.values()); return cachedCounters; } finally { readLock.unlock(); } } - + + @Override + public void addCounters(final TezCounters tezCounters) { + counters.incrAllCounters(tezCounters); + } + @Override public int getMaxTaskConcurrency() { return vertexConf.getInt(TezConfiguration.TEZ_AM_VERTEX_MAX_TASK_CONCURRENCY, @@ -3308,6 +3316,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl @Private public void constructFinalFullcounters() { this.fullCounters = new TezCounters(); + this.fullCounters.incrAllCounters(counters); this.vertexStats = new VertexStats(); for (Task t : this.tasks.values()) { http://git-wip-us.apache.org/repos/asf/tez/blob/ec913514/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java index 8912ad2..369afbe 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java @@ -19,6 +19,7 @@ package org.apache.tez.mapreduce; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.event.VertexState; import org.apache.tez.dag.records.TezDAGID; @@ -120,6 +121,11 @@ public class TezTestUtils { } @Override + public void addCounters(TezCounters tezCounters) { + throw new UnsupportedOperationException("addCounters not implemented in this mock"); + } + + @Override public UserPayload getUserPayload() { throw new UnsupportedOperationException("getUserPayload not implemented in this mock"); }
