Repository: tez Updated Branches: refs/heads/master 72c458a43 -> d0abd3d54
TEZ-3911: Optional min/max/avg aggr. task counters reported to HistoryLoggingService at final counter aggr (Vineet Garg, via Gopal V) Signed-off-by: Gopal V <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d0abd3d5 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d0abd3d5 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d0abd3d5 Branch: refs/heads/master Commit: d0abd3d54cbd864a388b7f1e66018c46db328e16 Parents: 72c458a Author: Vineet Garg <[email protected]> Authored: Wed May 9 12:02:15 2018 -0700 Committer: Gopal V <[email protected]> Committed: Wed May 9 12:02:15 2018 -0700 ---------------------------------------------------------------------- .../org/apache/tez/common/ATSConstants.java | 4 + .../common/counters/AbstractCounterGroup.java | 7 +- .../tez/common/counters/AbstractCounters.java | 11 +- .../counters/AggregateFrameworkCounter.java | 85 +++++++++++++ .../common/counters/AggregateTezCounter.java | 31 +++++ .../counters/AggregateTezCounterDelegate.java | 118 ++++++++++++++++++ .../common/counters/AggregateTezCounters.java | 119 +++++++++++++++++++ .../tez/common/counters/CounterGroupBase.java | 9 ++ .../common/counters/FileSystemCounterGroup.java | 9 +- .../common/counters/FrameworkCounterGroup.java | 10 +- .../apache/tez/common/counters/TezCounter.java | 8 ++ .../apache/tez/common/counters/TezCounters.java | 12 +- .../apache/tez/dag/app/dag/impl/DAGImpl.java | 22 ++-- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 30 ++--- .../apache/tez/dag/history/utils/DAGUtils.java | 11 ++ .../tez/dag/app/TestMockDAGAppMaster.java | 84 +++++++++++++ 16 files changed, 539 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/d0abd3d5/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java index 6e07849..47d536f 100644 --- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java +++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java @@ -107,6 +107,10 @@ public class ATSConstants { public static final String COUNTER_NAME = "counterName"; public static final String COUNTER_DISPLAY_NAME = "counterDisplayName"; public static final String COUNTER_VALUE = "counterValue"; + public static final String COUNTER_MIN_VALUE = "counterMinValue"; + public static final String COUNTER_MAX_VALUE = "counterMaxValue"; + public static final String COUNTER_INSTANCE_COUNT = "counterInstanceCount"; + /* Url related */ public static final String RESOURCE_URI_BASE = "/ws/v1/timeline"; http://git-wip-us.apache.org/repos/asf/tez/blob/d0abd3d5/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounterGroup.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounterGroup.java b/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounterGroup.java index a4b153f..1d1b56d 100644 --- a/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounterGroup.java +++ b/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounterGroup.java @@ -194,10 +194,15 @@ public abstract class AbstractCounterGroup<T extends TezCounter> @Override public void incrAllCounters(CounterGroupBase<T> rightGroup) { + aggrAllCounters(rightGroup); + } + + @Override + public void aggrAllCounters(CounterGroupBase<T> rightGroup) { try { for (TezCounter right : rightGroup) { TezCounter left = findCounter(right.getName(), right.getDisplayName()); - left.increment(right.getValue()); + left.aggregate(right); } } catch (LimitExceededException e) { counters.clear(); http://git-wip-us.apache.org/repos/asf/tez/blob/d0abd3d5/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounters.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounters.java b/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounters.java index 470cb78..5910164 100644 --- a/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounters.java +++ b/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounters.java @@ -354,13 +354,22 @@ public abstract class AbstractCounters<C extends TezCounter, * @param other the other Counters instance */ public synchronized void incrAllCounters(AbstractCounters<C, G> other) { + aggrAllCounters(other); + } + + /** + * Increments multiple counters by their amounts in another Counters + * instance. + * @param other the other Counters instance + */ + public synchronized void aggrAllCounters(AbstractCounters<C, G> other) { for(G right : other) { String groupName = right.getName(); G left = (isFrameworkGroup(groupName) ? fgroups : groups).get(groupName); if (left == null) { left = addGroup(groupName, right.getDisplayName()); } - left.incrAllCounters(right); + left.aggrAllCounters(right); } } http://git-wip-us.apache.org/repos/asf/tez/blob/d0abd3d5/tez-api/src/main/java/org/apache/tez/common/counters/AggregateFrameworkCounter.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/AggregateFrameworkCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/AggregateFrameworkCounter.java new file mode 100644 index 0000000..aa7d446 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/common/counters/AggregateFrameworkCounter.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.common.counters; + +import org.apache.tez.common.counters.FrameworkCounterGroup.FrameworkCounter; + +@SuppressWarnings("rawtypes") +public class AggregateFrameworkCounter<T extends Enum<T>> extends FrameworkCounter implements AggregateTezCounter { + + private long min = Long.MAX_VALUE; + private long max = Long.MIN_VALUE; + private long count = 0; + + @SuppressWarnings("unchecked") + public AggregateFrameworkCounter(Enum<T> ref, String groupName) { + super(ref, groupName); + } + + @Override + public void increment(long incr) { + throw new IllegalArgumentException("Cannot increment an aggregate counter directly"); + } + + @Override + public void aggregate(TezCounter other) { + final long val = other.getValue(); + final long othermax; + final long othermin; + final long othercount; + if (other instanceof AggregateTezCounter) { + othermax = ((AggregateTezCounter) other).getMax(); + othermin = ((AggregateTezCounter) other).getMin(); + othercount = ((AggregateTezCounter) other).getCount(); + } else { + othermin = othermax = val; + othercount = 1; + } + this.count += othercount; + super.increment(val); + if (this.min == Long.MAX_VALUE) { + this.min = othermin; + this.max = othermax; + return; + } + this.min = Math.min(this.min, othermin); + this.max = Math.max(this.max, othermax); + } + + @Override + public long getMin() { + return min; + } + + @Override + public long getMax() { + return max; + } + + @SuppressWarnings("unchecked") + public FrameworkCounter<T> asFrameworkCounter() { + return ((FrameworkCounter<T>)this); + } + + @Override + public long getCount() { + return count; + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/d0abd3d5/tez-api/src/main/java/org/apache/tez/common/counters/AggregateTezCounter.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/AggregateTezCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/AggregateTezCounter.java new file mode 100644 index 0000000..bf711da --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/common/counters/AggregateTezCounter.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.common.counters; + +public interface AggregateTezCounter { + + public abstract void aggregate(TezCounter other); + + public abstract long getMin(); + + public abstract long getMax(); + + public abstract long getCount(); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/d0abd3d5/tez-api/src/main/java/org/apache/tez/common/counters/AggregateTezCounterDelegate.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/AggregateTezCounterDelegate.java b/tez-api/src/main/java/org/apache/tez/common/counters/AggregateTezCounterDelegate.java new file mode 100644 index 0000000..ae2ca7b --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/common/counters/AggregateTezCounterDelegate.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.common.counters; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +public class AggregateTezCounterDelegate<T extends TezCounter> extends AbstractCounter implements AggregateTezCounter { + + private final T child; + private long min = Long.MAX_VALUE; + private long max = Long.MIN_VALUE; + private long count = 0; + + public AggregateTezCounterDelegate(T child) { + this.child = child; + } + + @Override + public String getName() { + return child.getName(); // this is a pass-through + } + + @Override + public String getDisplayName() { + return child.getDisplayName(); + } + + @Override + public long getValue() { + return child.getValue(); + } + + @Override + public void setValue(long value) { + this.child.setValue(value); + } + + @Override + public void increment(long incr) { + throw new UnsupportedOperationException("Cannot increment an aggregate counter"); + } + + /* (non-Javadoc) + * @see org.apache.tez.common.counters.AggregateTezCounter#aggregate(org.apache.tez.common.counters.TezCounter) + */ + @Override + public void aggregate(TezCounter other) { + final long val = other.getValue(); + final long othermax; + final long othermin; + final long othercount; + if (other instanceof AggregateTezCounter) { + othermax = ((AggregateTezCounter) other).getMax(); + othermin = ((AggregateTezCounter) other).getMin(); + othercount = ((AggregateTezCounter) other).getCount(); + } else { + othermin = othermax = val; + othercount = 1; + } + this.count += othercount; + this.child.increment(val); + if (this.min == Long.MAX_VALUE) { + this.min = othermin; + this.max = othermax; + return; + } + this.min = Math.min(this.min, othermin); + this.max = Math.max(this.max, othermax); + } + + @Override + public TezCounter getUnderlyingCounter() { + return this.child; + } + + @Override + public void readFields(DataInput arg0) throws IOException { + throw new UnsupportedOperationException("Cannot deserialize an aggregate counter"); + } + + @Override + public void write(DataOutput arg0) throws IOException { + throw new UnsupportedOperationException("Cannot deserialize an aggregate counter"); + } + + @Override + public long getMin() { + return min; + } + + @Override + public long getMax() { + return max; + } + + @Override + public long getCount() { + return count; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/d0abd3d5/tez-api/src/main/java/org/apache/tez/common/counters/AggregateTezCounters.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/AggregateTezCounters.java b/tez-api/src/main/java/org/apache/tez/common/counters/AggregateTezCounters.java new file mode 100644 index 0000000..332c24a --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/common/counters/AggregateTezCounters.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.common.counters; + +public class AggregateTezCounters extends TezCounters { + + private static final GroupFactory groupFactory = new GroupFactory(); + + public AggregateTezCounters() { + super(groupFactory); + } + + // Mix framework group implementation into CounterGroup interface + private static class AggregateFrameworkGroupImpl<T extends Enum<T>> + extends FrameworkCounterGroup<T, TezCounter> implements CounterGroup { + + AggregateFrameworkGroupImpl(Class<T> cls) { + super(cls); + } + + @Override + protected FrameworkCounter<T> newCounter(T key) { + return (new AggregateFrameworkCounter<T>(key, getName())) + .asFrameworkCounter(); + } + + @Override + public CounterGroupBase<TezCounter> getUnderlyingGroup() { + return this; + } + } + + // Mix generic group implementation into CounterGroup interface + // and provide some mandatory group factory methods. + private static class AggregateGenericGroup extends AbstractCounterGroup<TezCounter> + implements CounterGroup { + + AggregateGenericGroup(String name, String displayName, Limits limits) { + super(name, displayName, limits); + } + + @Override + protected TezCounter newCounter(String name, String displayName, long value) { + return new AggregateTezCounterDelegate<GenericCounter>(new GenericCounter(name, displayName, value)); + } + + @Override + protected TezCounter newCounter() { + return new AggregateTezCounterDelegate<GenericCounter>(new GenericCounter()); + } + + @Override + public CounterGroupBase<TezCounter> getUnderlyingGroup() { + return this; + } + } + + // Mix file system group implementation into the CounterGroup interface + private static class AggregateFileSystemGroup extends FileSystemCounterGroup<TezCounter> + implements CounterGroup { + + @Override + protected TezCounter newCounter(String scheme, FileSystemCounter key) { + return new AggregateTezCounterDelegate<FSCounter>(new FSCounter(scheme, key)); + } + + @Override + public CounterGroupBase<TezCounter> getUnderlyingGroup() { + return this; + } + } + + /** + * Provide factory methods for counter group factory implementation. + * See also the GroupFactory in + * {@link org.apache.hadoop.TezCounters.Counters mapred.Counters} + */ + private static class GroupFactory + extends CounterGroupFactory<TezCounter, CounterGroup> { + + @Override + protected <T extends Enum<T>> + FrameworkGroupFactory<CounterGroup> + newFrameworkGroupFactory(final Class<T> cls) { + return new FrameworkGroupFactory<CounterGroup>() { + @Override public CounterGroup newGroup(String name) { + return new AggregateFrameworkGroupImpl<T>(cls); // impl in this package + } + }; + } + + @Override + protected CounterGroup newGenericGroup(String name, String displayName, + Limits limits) { + return new AggregateGenericGroup(name, displayName, limits); + } + + @Override + protected CounterGroup newFileSystemGroup() { + return new AggregateFileSystemGroup(); + } + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/d0abd3d5/tez-api/src/main/java/org/apache/tez/common/counters/CounterGroupBase.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/CounterGroupBase.java b/tez-api/src/main/java/org/apache/tez/common/counters/CounterGroupBase.java index be4bf77..216d2f4 100644 --- a/tez-api/src/main/java/org/apache/tez/common/counters/CounterGroupBase.java +++ b/tez-api/src/main/java/org/apache/tez/common/counters/CounterGroupBase.java @@ -97,8 +97,17 @@ public interface CounterGroupBase<T extends TezCounter> * Increment all counters by a group of counters * @param rightGroup the group to be added to this group */ + @Deprecated void incrAllCounters(CounterGroupBase<T> rightGroup); + /** + * Aggregate all counters by a group of counters + * @param rightGroup the group to be added to this group + */ + public default void aggrAllCounters(CounterGroupBase<T> rightGroup) { + incrAllCounters(rightGroup); + } + @Private /** * Exposes the underlying group type if a facade. http://git-wip-us.apache.org/repos/asf/tez/blob/d0abd3d5/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounterGroup.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounterGroup.java b/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounterGroup.java index 5024154..3ea4acd 100644 --- a/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounterGroup.java +++ b/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounterGroup.java @@ -225,12 +225,17 @@ public abstract class FileSystemCounterGroup<C extends TezCounter> } @Override - public void incrAllCounters(CounterGroupBase<C> other) { + public void incrAllCounters(CounterGroupBase<C> rightGroup) { + aggrAllCounters(rightGroup); + } + + @Override + public void aggrAllCounters(CounterGroupBase<C> other) { if (checkNotNull(other.getUnderlyingGroup(), "other group") instanceof FileSystemCounterGroup<?>) { for (TezCounter counter : other) { FSCounter c = (FSCounter) ((TezCounter)counter).getUnderlyingCounter(); - findCounter(c.scheme, c.key) .increment(counter.getValue()); + findCounter(c.scheme, c.key) .aggregate(counter); } } } http://git-wip-us.apache.org/repos/asf/tez/blob/d0abd3d5/tez-api/src/main/java/org/apache/tez/common/counters/FrameworkCounterGroup.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/FrameworkCounterGroup.java b/tez-api/src/main/java/org/apache/tez/common/counters/FrameworkCounterGroup.java index 3a4aa97..bcb6454 100644 --- a/tez-api/src/main/java/org/apache/tez/common/counters/FrameworkCounterGroup.java +++ b/tez-api/src/main/java/org/apache/tez/common/counters/FrameworkCounterGroup.java @@ -190,14 +190,20 @@ public abstract class FrameworkCounterGroup<T extends Enum<T>, return n; } + @Override + @SuppressWarnings("deprecation") + public void incrAllCounters(CounterGroupBase<C> rightGroup) { + aggrAllCounters(rightGroup); + } + @SuppressWarnings("rawtypes") @Override - public void incrAllCounters(CounterGroupBase<C> other) { + public void aggrAllCounters(CounterGroupBase<C> other) { if (checkNotNull(other, "other counter group") instanceof FrameworkCounterGroup<?, ?>) { for (TezCounter counter : other) { findCounter(((FrameworkCounter) counter).key.name()) - .increment(counter.getValue()); + .aggregate(counter); } } } http://git-wip-us.apache.org/repos/asf/tez/blob/d0abd3d5/tez-api/src/main/java/org/apache/tez/common/counters/TezCounter.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/TezCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/TezCounter.java index 2b40ed2..4cb1ae9 100644 --- a/tez-api/src/main/java/org/apache/tez/common/counters/TezCounter.java +++ b/tez-api/src/main/java/org/apache/tez/common/counters/TezCounter.java @@ -73,6 +73,14 @@ public interface TezCounter extends Writable { * @param incr the value to increase this counter by */ void increment(long incr); + + /** + * Aggregate this counter with another counter + * @param other TezCounter to aggregate with, by default this is incr(other.getValue()) + */ + public default void aggregate(TezCounter other) { + increment(other.getValue()); + }; /** * Return the underlying object if this is a facade. http://git-wip-us.apache.org/repos/asf/tez/blob/d0abd3d5/tez-api/src/main/java/org/apache/tez/common/counters/TezCounters.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/TezCounters.java b/tez-api/src/main/java/org/apache/tez/common/counters/TezCounters.java index ca03f41..a1205b9 100644 --- a/tez-api/src/main/java/org/apache/tez/common/counters/TezCounters.java +++ b/tez-api/src/main/java/org/apache/tez/common/counters/TezCounters.java @@ -128,7 +128,17 @@ public class TezCounters extends AbstractCounters<TezCounter, CounterGroup> { * Default constructor */ public TezCounters() { - super(groupFactory); + this(groupFactory); + } + + /** + * Construct the Counters object from the another counters object + * @param <C> the type of counter + * @param <G> the type of counter group + */ + public <C extends TezCounter, G extends CounterGroupBase<C>> TezCounters( + CounterGroupFactory<TezCounter, CounterGroup> customGroupFactory) { + super(customGroupFactory); } /** http://git-wip-us.apache.org/repos/asf/tez/blob/d0abd3d5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index ecd8d17..bd5e0ff 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.util.Clock; import org.apache.tez.common.ATSConstants; import org.apache.tez.common.ReflectionUtils; +import org.apache.tez.common.counters.AggregateTezCounters; import org.apache.tez.common.counters.DAGCounter; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.DagTypeConverters; @@ -700,7 +701,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, updateCpuCounters(); TezCounters counters = new TezCounters(); counters.incrAllCounters(dagCounters); - return incrTaskCounters(counters, vertices.values()); + return aggrTaskCounters(counters, vertices.values()); } finally { readLock.unlock(); @@ -732,7 +733,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, updateCpuCounters(); TezCounters counters = new TezCounters(); counters.incrAllCounters(dagCounters); - return incrTaskCounters(counters, vertices.values()); + return aggrTaskCounters(counters, vertices.values()); } finally { readLock.unlock(); @@ -748,10 +749,10 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, return false; } - public static TezCounters incrTaskCounters( + public static TezCounters aggrTaskCounters( TezCounters counters, Collection<Vertex> vertices) { for (Vertex vertex : vertices) { - counters.incrAllCounters(vertex.getAllCounters()); + counters.aggrAllCounters(vertex.getAllCounters()); } return counters; } @@ -1399,7 +1400,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, updateCpuCounters(); TezCounters counters = null; try { - counters = getAllCounters(); + counters = constructFinalFullcounters(); } catch (LimitExceededException e) { addDiagnostic("Counters limit exceeded: " + e.getMessage()); finalState = DAGState.FAILED; @@ -1868,17 +1869,18 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, // Already constructed. Just return. return; } - this.constructFinalFullcounters(); + this.fullCounters = this.constructFinalFullcounters(); } } @Private - public void constructFinalFullcounters() { - this.fullCounters = new TezCounters(); - this.fullCounters.incrAllCounters(dagCounters); + public TezCounters constructFinalFullcounters() { + final AggregateTezCounters aggregateTezCounters = new AggregateTezCounters(); + aggregateTezCounters.aggrAllCounters(dagCounters); for (Vertex v : this.vertices.values()) { - this.fullCounters.incrAllCounters(v.getAllCounters()); + aggregateTezCounters.aggrAllCounters(v.getAllCounters()); } + return aggregateTezCounters; } /** http://git-wip-us.apache.org/repos/asf/tez/blob/d0abd3d5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index f3fc269..0184657 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -63,6 +63,7 @@ import org.apache.tez.client.TezClientUtils; import org.apache.tez.common.ATSConstants; import org.apache.tez.common.ReflectionUtils; import org.apache.tez.common.TezUtilsInternal; +import org.apache.tez.common.counters.AggregateTezCounters; import org.apache.tez.common.counters.LimitExceededException; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.common.io.NonSyncByteArrayOutputStream; @@ -1197,8 +1198,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl } TezCounters counters = new TezCounters(); - counters.incrAllCounters(this.counters); - return incrTaskCounters(counters, tasks.values()); + counters.aggrAllCounters(this.counters); + return aggrTaskCounters(counters, tasks.values()); } finally { readLock.unlock(); @@ -1226,8 +1227,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl } TezCounters counters = new TezCounters(); - counters.incrAllCounters(this.counters); - cachedCounters = incrTaskCounters(counters, tasks.values()); + counters.aggrAllCounters(this.counters); + cachedCounters = aggrTaskCounters(counters, tasks.values()); return cachedCounters; } finally { readLock.unlock(); @@ -1236,7 +1237,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl @Override public void addCounters(final TezCounters tezCounters) { - counters.incrAllCounters(tezCounters); + counters.aggrAllCounters(tezCounters); } @Override @@ -1335,10 +1336,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl return false; } - public static TezCounters incrTaskCounters( + public static TezCounters aggrTaskCounters( TezCounters counters, Collection<Task> tasks) { for (Task task : tasks) { - counters.incrAllCounters(task.getCounters()); + counters.aggrAllCounters(task.getCounters()); } return counters; } @@ -2057,7 +2058,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl || !recoveryData.isVertexSucceeded()) { logJobHistoryVertexCompletedHelper(VertexState.SUCCEEDED, finishTime, logSuccessDiagnostics ? StringUtils.join(getDiagnostics(), LINE_SEPARATOR) : "", - getAllCounters()); + constructFinalFullcounters()); } } @@ -2066,7 +2067,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl || !recoveryData.isVertexFinished()) { TezCounters counters = null; try { - counters = getAllCounters(); + counters = constructFinalFullcounters(); } catch (LimitExceededException e) { // Ignore as failed vertex addDiagnostic("Counters limit exceeded: " + e.getMessage()); @@ -3325,7 +3326,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl // Already constructed. Just return. return; } - this.constructFinalFullcounters(); + this.fullCounters = this.constructFinalFullcounters(); } } @@ -3334,16 +3335,17 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl } @Private - public void constructFinalFullcounters() { - this.fullCounters = new TezCounters(); - this.fullCounters.incrAllCounters(counters); + public TezCounters constructFinalFullcounters() { + AggregateTezCounters aggregateTezCounters = new AggregateTezCounters(); + aggregateTezCounters.aggrAllCounters(counters); this.vertexStats = new VertexStats(); for (Task t : this.tasks.values()) { vertexStats.updateStats(t.getReport()); TezCounters counters = t.getCounters(); - this.fullCounters.incrAllCounters(counters); + aggregateTezCounters.aggrAllCounters(counters); } + return aggregateTezCounters; } private static class RootInputInitFailedTransition implements http://git-wip-us.apache.org/repos/asf/tez/blob/d0abd3d5/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java index dce9e52..b2622ad 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java @@ -34,6 +34,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.tez.common.ATSConstants; import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.VersionInfo; +import org.apache.tez.common.counters.AggregateTezCounter; +import org.apache.tez.common.counters.AggregateTezCounterDelegate; import org.apache.tez.common.counters.CounterGroup; import org.apache.tez.common.counters.TezCounter; import org.apache.tez.common.counters.TezCounters; @@ -153,6 +155,15 @@ public class DAGUtils { counter.getDisplayName()); } counterMap.put(ATSConstants.COUNTER_VALUE, counter.getValue()); + if (counter instanceof AggregateTezCounter) { + counterMap.put(ATSConstants.COUNTER_INSTANCE_COUNT, + ((AggregateTezCounter)counter).getCount()); + counterMap.put(ATSConstants.COUNTER_MAX_VALUE, + ((AggregateTezCounter)counter).getMax()); + counterMap.put(ATSConstants.COUNTER_MIN_VALUE, + ((AggregateTezCounter)counter).getMin()); + + } counterList.add(counterMap); } } http://git-wip-us.apache.org/repos/asf/tez/blob/d0abd3d5/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java index 6268912..859537b 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java @@ -45,6 +45,8 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.URL; import org.apache.log4j.Level; import org.apache.log4j.Logger; +import org.apache.tez.common.counters.AggregateFrameworkCounter; +import org.apache.tez.common.counters.AggregateTezCounterDelegate; import org.apache.tez.common.counters.CounterGroup; import org.apache.tez.common.counters.DAGCounter; import org.apache.tez.common.counters.TezCounters; @@ -457,6 +459,88 @@ public class TestMockDAGAppMaster { tezClient.stop(); } + @Test + public void testCountersAggregation() throws Exception { + TezConfiguration tezconf = new TezConfiguration(defaultConf); + MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, + null, false, false); + tezClient.start(); + + final String vAName = "A"; + final String vBName = "B"; + final String procCounterName = "Proc"; + final String globalCounterName = "Global"; + DAG dag = DAG.create("testCountersAggregation"); + Vertex vA = Vertex.create(vAName, ProcessorDescriptor.create("Proc.class"), 10); + Vertex vB = Vertex.create(vBName, ProcessorDescriptor.create("Proc.class"), 1); + dag.addVertex(vA) + .addVertex(vB) + .addEdge( + Edge.create(vA, vB, EdgeProperty.create(DataMovementType.SCATTER_GATHER, + DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, + OutputDescriptor.create("Out"), InputDescriptor.create("In")))); + TezCounters temp = new TezCounters(); + temp.findCounter(new String(globalCounterName), new String(globalCounterName)).increment(1); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutput out = new DataOutputStream(bos); + temp.write(out); + final byte[] payload = bos.toByteArray(); + + MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp(); + MockContainerLauncher mockLauncher = mockApp.getContainerLauncher(); + mockLauncher.startScheduling(false); + mockApp.countersDelegate = new CountersDelegate() { + int counterValue = 0; + @Override + public TezCounters getCounters(TaskSpec taskSpec) { + String vName = taskSpec.getVertexName(); + TezCounters counters = new TezCounters(); + final DataInputByteBuffer in = new DataInputByteBuffer(); + in.reset(ByteBuffer.wrap(payload)); + try { + // this ensures that the serde code path is covered. + // the internal merges of counters covers the constructor code path. + counters.readFields(in); + } catch (IOException e) { + Assert.fail(e.getMessage()); + } + counters.findCounter(vName, procCounterName).setValue(++counterValue); + for (OutputSpec output : taskSpec.getOutputs()) { + counters.findCounter(vName, output.getDestinationVertexName()).setValue(++counterValue); + } + for (InputSpec input : taskSpec.getInputs()) { + counters.findCounter(vName, input.getSourceVertexName()).setValue(++counterValue); + } + return counters; + } + }; + mockApp.doSleep = false; + DAGClient dagClient = tezClient.submitDAG(dag); + mockLauncher.waitTillContainersLaunched(); + DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG(); + mockLauncher.startScheduling(true); + DAGStatus status = dagClient.waitForCompletion(); + Assert.assertEquals(DAGStatus.State.SUCCEEDED, status.getState()); + TezCounters counters = dagImpl.getAllCounters(); + + // verify processor counters + VertexImpl vAImpl = (VertexImpl) dagImpl.getVertex(vAName); + VertexImpl vBImpl = (VertexImpl) dagImpl.getVertex(vBName); + TezCounters vACounters = vAImpl.getAllCounters(); + TezCounters vBCounters = vBImpl.getAllCounters(); + + Assert.assertEquals(19, ((AggregateTezCounterDelegate)vACounters.findCounter(vAName, procCounterName)).getMax()); + Assert.assertEquals(1, ((AggregateTezCounterDelegate)vACounters.findCounter(vAName, procCounterName)).getMin()); + Assert.assertEquals(20, ((AggregateTezCounterDelegate)vACounters.findCounter(vAName, vBName)).getMax()); + Assert.assertEquals(2, ((AggregateTezCounterDelegate)vACounters.findCounter(vAName, vBName)).getMin()); + + Assert.assertEquals(21, ((AggregateTezCounterDelegate)vBCounters.findCounter(vBName, procCounterName)).getMin()); + Assert.assertEquals(21, ((AggregateTezCounterDelegate)vBCounters.findCounter(vBName, procCounterName)).getMax()); + Assert.assertEquals(22, ((AggregateTezCounterDelegate)vBCounters.findCounter(vBName, vAName)).getMin()); + Assert.assertEquals(22, ((AggregateTezCounterDelegate)vBCounters.findCounter(vBName, vAName)).getMax()); + + tezClient.stop(); + } @Test (timeout = 10000) public void testBasicCounters() throws Exception {
