Repository: tez Updated Branches: refs/heads/master 87aac1270 -> f46997a7c
TEZ-2308. Add set/get of record counts in task/vertex statistics (bikas) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f46997a7 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f46997a7 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f46997a7 Branch: refs/heads/master Commit: f46997a7cbc5474e3368c852f31a95c97da3b6a4 Parents: 87aac12 Author: Bikas Saha <[email protected]> Authored: Tue Apr 21 14:01:59 2015 -0700 Committer: Bikas Saha <[email protected]> Committed: Tue Apr 21 14:01:59 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/tez/runtime/api/InputStatistics.java | 9 +++++ .../runtime/api/InputStatisticsReporter.java | 10 ++++++ .../tez/runtime/api/OutputStatistics.java | 9 +++++ .../runtime/api/OutputStatisticsReporter.java | 9 +++++ .../apache/tez/dag/app/dag/impl/VertexImpl.java | 17 +++++----- .../tez/dag/app/TestMockDAGAppMaster.java | 12 +++++-- .../org/apache/tez/mapreduce/input/MRInput.java | 4 +++ .../tez/mapreduce/input/MultiMRInput.java | 5 +++ .../apache/tez/mapreduce/output/MROutput.java | 4 +++ .../tez/runtime/api/impl/IOStatistics.java | 35 +++++++++++++++++++- .../tez/runtime/api/impl/TaskStatistics.java | 4 +-- .../runtime/api/impl/TezInputContextImpl.java | 7 ++++ .../runtime/api/impl/TezOutputContextImpl.java | 7 ++++ .../library/input/OrderedGroupedKVInput.java | 7 ++-- .../runtime/library/input/UnorderedKVInput.java | 7 ++-- .../output/OrderedPartitionedKVOutput.java | 5 ++- .../library/output/UnorderedKVOutput.java | 6 ++-- .../output/UnorderedPartitionedKVOutput.java | 5 ++- .../library/output/TestOnFileSortedOutput.java | 15 +++++++-- .../output/TestOnFileUnorderedKVOutput.java | 2 ++ 21 files changed, 157 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/f46997a7/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 4ba2f2f..5293120 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly ALL CHANGES: + TEZ-2308. Add set/get of record counts in task/vertex statistics TEZ-2344. Tez UI: Equip basic-ember-table's cell level loading for all use cases in all DAGs table TEZ-2313. Regression in handling obsolete events in ShuffleScheduler. TEZ-2212. Notify components on DAG completion. http://git-wip-us.apache.org/repos/asf/tez/blob/f46997a7/tez-api/src/main/java/org/apache/tez/runtime/api/InputStatistics.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/InputStatistics.java b/tez-api/src/main/java/org/apache/tez/runtime/api/InputStatistics.java index fb99f2d..1066dbb 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/InputStatistics.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/InputStatistics.java @@ -37,4 +37,13 @@ public interface InputStatistics { * @return Data size in bytes */ public long getDataSize(); + + /** + * Get the numbers of items processed. These could be key-value pairs, table + * records etc. + * + * @return Number of items processed + */ + public long getItemsProcessed(); + } http://git-wip-us.apache.org/repos/asf/tez/blob/f46997a7/tez-api/src/main/java/org/apache/tez/runtime/api/InputStatisticsReporter.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/InputStatisticsReporter.java b/tez-api/src/main/java/org/apache/tez/runtime/api/InputStatisticsReporter.java index 68a56e7..a85d25b 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/InputStatisticsReporter.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/InputStatisticsReporter.java @@ -34,4 +34,14 @@ public interface InputStatisticsReporter { */ public void reportDataSize(long size); + /** + * Report the number of items processed. These could be key-value pairs, table + * records etc. + * + * @param items + * number of items + */ + public void reportItemsProcessed(long items); + + } http://git-wip-us.apache.org/repos/asf/tez/blob/f46997a7/tez-api/src/main/java/org/apache/tez/runtime/api/OutputStatistics.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputStatistics.java b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputStatistics.java index 0373606..2f18a03 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputStatistics.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputStatistics.java @@ -38,4 +38,13 @@ public interface OutputStatistics { * @return Data size in bytes */ public long getDataSize(); + + /** + * Get the numbers of items processed. These could be key-value pairs, table + * records etc. + * + * @return Number of items processed + */ + public long getItemsProcessed(); + } http://git-wip-us.apache.org/repos/asf/tez/blob/f46997a7/tez-api/src/main/java/org/apache/tez/runtime/api/OutputStatisticsReporter.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputStatisticsReporter.java b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputStatisticsReporter.java index fc9f1b7..1931e5c 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputStatisticsReporter.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputStatisticsReporter.java @@ -34,4 +34,13 @@ public interface OutputStatisticsReporter { */ public void reportDataSize(long size); + /** + * Report the number of items processed. These could be key-value pairs, table + * records etc. + * + * @param items + * number of items + */ + public void reportItemsProcessed(long items); + } http://git-wip-us.apache.org/repos/asf/tez/blob/f46997a7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index b4b8062..5dfcb8e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -788,10 +788,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, return super.getDataSize(); } - void mergeFrom(org.apache.tez.runtime.api.impl.IOStatistics other) { - this.setDataSize(this.getDataSize() + other.getDataSize()); + @Override + public long getItemsProcessed() { + return super.getItemsProcessed(); } - } class VertexStatisticsImpl implements VertexStatistics { @@ -813,12 +813,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, return; } - for (Map.Entry<String, org.apache.tez.runtime.api.impl.IOStatistics> entry : taskStats.getIOStatistics().entrySet()) { - String edgeName = entry.getKey(); - IOStatisticsImpl myEdgeStat = ioStats.get(edgeName); - Preconditions.checkState(myEdgeStat != null, "Unexpected IO name: " + edgeName + for (Map.Entry<String, org.apache.tez.runtime.api.impl.IOStatistics> entry : taskStats + .getIOStatistics().entrySet()) { + String ioName = entry.getKey(); + IOStatisticsImpl myIOStat = ioStats.get(ioName); + Preconditions.checkState(myIOStat != null, "Unexpected IO name: " + ioName + " for vertex:" + getLogIdentifier()); - myEdgeStat.mergeFrom(entry.getValue()); + myIOStat.mergeFrom(entry.getValue()); } } http://git-wip-us.apache.org/repos/asf/tez/blob/f46997a7/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java index 4d01117..db1d632 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java @@ -376,6 +376,7 @@ public class TestMockDAGAppMaster { OutputDescriptor.create("Out"), InputDescriptor.create("In")))); IOStatistics ioStats = new IOStatistics(); ioStats.setDataSize(1); + ioStats.setItemsProcessed(1); TaskStatistics vAStats = new TaskStatistics(); vAStats.addIO(vBName, ioStats); vAStats.addIO(sourceName, ioStats); @@ -426,10 +427,14 @@ public class TestMockDAGAppMaster { VertexStatistics vStats = v.getStatistics(); if (v.getName().equals(vAName)) { Assert.assertEquals(3, vStats.getOutputStatistics(vBName).getDataSize()); - Assert.assertEquals(3, vStats.getInputStatistics(sourceName).getDataSize()); + Assert.assertEquals(3, vStats.getInputStatistics(sourceName).getDataSize()); + Assert.assertEquals(3, vStats.getOutputStatistics(vBName).getItemsProcessed()); + Assert.assertEquals(3, vStats.getInputStatistics(sourceName).getItemsProcessed()); } else { Assert.assertEquals(2, vStats.getInputStatistics(vAName).getDataSize()); - Assert.assertEquals(2, vStats.getOutputStatistics(sinkName).getDataSize()); + Assert.assertEquals(2, vStats.getOutputStatistics(sinkName).getDataSize()); + Assert.assertEquals(2, vStats.getInputStatistics(vAName).getItemsProcessed()); + Assert.assertEquals(2, vStats.getOutputStatistics(sinkName).getItemsProcessed()); } } @@ -521,6 +526,7 @@ public class TestMockDAGAppMaster { IOStatistics ioStats = new IOStatistics(); ioStats.setDataSize(1); + ioStats.setItemsProcessed(1); TaskStatistics vAStats = new TaskStatistics(); DAG dag = DAG.create("testBasisStatistics"); @@ -566,6 +572,8 @@ public class TestMockDAGAppMaster { Assert.assertEquals(DAGStatus.State.SUCCEEDED, status.getState()); Assert.assertEquals(numTasks, dagImpl.getVertex(vAName).getStatistics().getInputStatistics(0+vAName).getDataSize()); + Assert.assertEquals(numTasks, + dagImpl.getVertex(vAName).getStatistics().getInputStatistics(0+vAName).getItemsProcessed()); checkMemory(dag.getName(), mockApp); tezClient.stop(); } http://git-wip-us.apache.org/repos/asf/tez/blob/f46997a7/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java index 2fb1647..991f6d1 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java @@ -532,6 +532,10 @@ public class MRInput extends MRInputBase { @Override public List<Event> close() throws IOException { mrReader.close(); + long inputRecords = getContext().getCounters() + .findCounter(TaskCounter.INPUT_RECORDS_PROCESSED).getValue(); + getContext().getStatisticsReporter().reportItemsProcessed(inputRecords); + return null; } http://git-wip-us.apache.org/repos/asf/tez/blob/f46997a7/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java index 9596f07..425d737 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java @@ -40,6 +40,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.JobConf; +import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.mapreduce.input.base.MRInputBase; import org.apache.tez.mapreduce.lib.MRInputUtils; import org.apache.tez.mapreduce.lib.MRReader; @@ -187,6 +188,10 @@ public class MultiMRInput extends MRInputBase { for (MRReader reader : readers) { reader.close(); } + long inputRecords = getContext().getCounters() + .findCounter(TaskCounter.INPUT_RECORDS_PROCESSED).getValue(); + getContext().getStatisticsReporter().reportItemsProcessed(inputRecords); + return null; } http://git-wip-us.apache.org/repos/asf/tez/blob/f46997a7/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java index 349a894..483c92b 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java @@ -507,6 +507,10 @@ public class MROutput extends AbstractLogicalOutput { @Override public synchronized List<Event> close() throws IOException { flush(); + long outputRecords = getContext().getCounters() + .findCounter(TaskCounter.OUTPUT_RECORDS).getValue(); + getContext().getStatisticsReporter().reportItemsProcessed(outputRecords); + return null; } http://git-wip-us.apache.org/repos/asf/tez/blob/f46997a7/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/IOStatistics.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/IOStatistics.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/IOStatistics.java index ede9205..0f8b589 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/IOStatistics.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/IOStatistics.java @@ -17,8 +17,15 @@ package org.apache.tez.runtime.api.impl; -public class IOStatistics { +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; + +public class IOStatistics implements Writable { private long dataSize = 0; + private long numItems = 0; public void setDataSize(long size) { this.dataSize = size; @@ -27,4 +34,30 @@ public class IOStatistics { public long getDataSize() { return dataSize; } + + public void setItemsProcessed(long items) { + this.numItems = items; + } + + public long getItemsProcessed() { + return numItems; + } + + public void mergeFrom(org.apache.tez.runtime.api.impl.IOStatistics other) { + this.setDataSize(this.getDataSize() + other.getDataSize()); + this.setItemsProcessed(this.getItemsProcessed() + other.getItemsProcessed()); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeLong(getDataSize()); + out.writeLong(getItemsProcessed()); + } + + @Override + public void readFields(DataInput in) throws IOException { + setDataSize(in.readLong()); + setItemsProcessed(in.readLong()); + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/f46997a7/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskStatistics.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskStatistics.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskStatistics.java index b50d099..0b4bef8 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskStatistics.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskStatistics.java @@ -54,7 +54,7 @@ public class TaskStatistics implements Writable { for (Map.Entry<String, IOStatistics> entry : ioStatistics.entrySet()) { IOStatistics edgeStats = entry.getValue(); Text.writeString(out, entry.getKey()); - out.writeLong(edgeStats.getDataSize()); + edgeStats.write(out); } } @@ -64,7 +64,7 @@ public class TaskStatistics implements Writable { for (int i=0; i<numEntries; ++i) { String edgeName = Text.readString(in); IOStatistics edgeStats = new IOStatistics(); - edgeStats.setDataSize(in.readLong()); + edgeStats.readFields(in); addIO(edgeName, edgeStats); } } http://git-wip-us.apache.org/repos/asf/tez/blob/f46997a7/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java index a9b7eab..f6330f3 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java @@ -66,6 +66,13 @@ public class TezInputContextImpl extends TezTaskContextImpl runtimeTask.getTaskStatistics().getIOStatistics().get(sourceVertexName) .setDataSize(size); } + + @Override + public void reportItemsProcessed(long items) { + // this is a concurrent map. Plus we are not adding/deleting entries + runtimeTask.getTaskStatistics().getIOStatistics().get(sourceVertexName) + .setItemsProcessed(items); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/f46997a7/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java index 17513dd..4045113 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java @@ -62,6 +62,13 @@ public class TezOutputContextImpl extends TezTaskContextImpl runtimeTask.getTaskStatistics().getIOStatistics().get(destinationVertexName) .setDataSize(size); } + + @Override + public void reportItemsProcessed(long items) { + // this is a concurrent map. Plus we are not adding/deleting entries + runtimeTask.getTaskStatistics().getIOStatistics().get(destinationVertexName) + .setItemsProcessed(items);; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/f46997a7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java index d2d9ed8..e61dbdc 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java @@ -186,9 +186,12 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput { shuffle.shutdown(); } - long outputSize = getContext().getCounters() + long dataSize = getContext().getCounters() .findCounter(TaskCounter.SHUFFLE_BYTES_DECOMPRESSED).getValue(); - getContext().getStatisticsReporter().reportDataSize(outputSize); + getContext().getStatisticsReporter().reportDataSize(dataSize); + long inputRecords = getContext().getCounters() + .findCounter(TaskCounter.REDUCE_INPUT_RECORDS).getValue(); + getContext().getStatisticsReporter().reportItemsProcessed(inputRecords); return Collections.emptyList(); } http://git-wip-us.apache.org/repos/asf/tez/blob/f46997a7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java index f2f47e1..ce27103 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java @@ -209,9 +209,12 @@ public class UnorderedKVInput extends AbstractLogicalInput { this.shuffleManager.shutdown(); } - long outputSize = getContext().getCounters() + long dataSize = getContext().getCounters() .findCounter(TaskCounter.SHUFFLE_BYTES_DECOMPRESSED).getValue(); - getContext().getStatisticsReporter().reportDataSize(outputSize); + getContext().getStatisticsReporter().reportDataSize(dataSize); + long inputRecords = getContext().getCounters() + .findCounter(TaskCounter.INPUT_RECORDS_PROCESSED).getValue(); + getContext().getStatisticsReporter().reportItemsProcessed(inputRecords); return null; } http://git-wip-us.apache.org/repos/asf/tez/blob/f46997a7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java index df6daf2..40edc76 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java @@ -192,7 +192,10 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput { long outputSize = getContext().getCounters().findCounter(TaskCounter.OUTPUT_BYTES).getValue(); getContext().getStatisticsReporter().reportDataSize(outputSize); - + long outputRecords = getContext().getCounters() + .findCounter(TaskCounter.OUTPUT_RECORDS).getValue(); + getContext().getStatisticsReporter().reportItemsProcessed(outputRecords); + return returnEvents; } http://git-wip-us.apache.org/repos/asf/tez/blob/f46997a7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java index 6c9077e..2c26374 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java @@ -44,7 +44,6 @@ import org.apache.tez.runtime.library.api.Partitioner; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler; import org.apache.tez.runtime.library.common.writers.UnorderedPartitionedKVWriter; -import org.apache.tez.runtime.library.partitioner.HashPartitioner; import com.google.common.annotations.VisibleForTesting; @@ -131,7 +130,10 @@ public class UnorderedKVOutput extends AbstractLogicalOutput { long outputSize = getContext().getCounters().findCounter(TaskCounter.OUTPUT_BYTES).getValue(); getContext().getStatisticsReporter().reportDataSize(outputSize); - + long outputRecords = getContext().getCounters() + .findCounter(TaskCounter.OUTPUT_RECORDS).getValue(); + getContext().getStatisticsReporter().reportItemsProcessed(outputRecords); + return returnEvents; } http://git-wip-us.apache.org/repos/asf/tez/blob/f46997a7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java index 8b83f9b..34f2e3e 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java @@ -107,7 +107,10 @@ public class UnorderedPartitionedKVOutput extends AbstractLogicalOutput { long outputSize = getContext().getCounters().findCounter(TaskCounter.OUTPUT_BYTES).getValue(); getContext().getStatisticsReporter().reportDataSize(outputSize); - + long outputRecords = getContext().getCounters() + .findCounter(TaskCounter.OUTPUT_RECORDS).getValue(); + getContext().getStatisticsReporter().reportItemsProcessed(outputRecords); + return returnEvents; } http://git-wip-us.apache.org/repos/asf/tez/blob/f46997a7/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java index 30f78fe..19eb18a 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java @@ -72,7 +72,7 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; - +@SuppressWarnings({ "rawtypes", "unchecked" }) @RunWith(Parameterized.class) public class TestOnFileSortedOutput { private static final Random rnd = new Random(); @@ -90,6 +90,7 @@ public class TestOnFileSortedOutput { private int sorterThreads; final AtomicLong outputSize = new AtomicLong(); + final AtomicLong numRecords = new AtomicLong(); private KeyValuesWriter writer; private OrderedPartitionedKVOutput sortedOutput; @@ -135,6 +136,7 @@ public class TestOnFileSortedOutput { sendEmptyPartitionViaEvent); outputSize.set(0); + numRecords.set(0); fs.mkdirs(workingDir); this.partitions = Math.max(1, rnd.nextInt(10)); } @@ -271,10 +273,12 @@ public class TestOnFileSortedOutput { startSortedOutput(partitions); //Write random set of keys + long recordsWritten = numRecords.get(); for (int i = 0; i < Math.max(1, rnd.nextInt(50)); i++) { Text key = new Text(new BigInteger(256, rnd).toString()); LinkedList values = new LinkedList(); for (int j = 0; j < Math.max(2, rnd.nextInt(10)); j++) { + recordsWritten++; values.add(new Text(new BigInteger(256, rnd).toString())); } writer.write(key, values); @@ -282,7 +286,7 @@ public class TestOnFileSortedOutput { List<Event> eventList = sortedOutput.close(); assertTrue(eventList != null && eventList.size() == 2); - + assertEquals(recordsWritten, numRecords.get()); ShuffleUserPayloads.DataMovementEventPayloadProto payload = ShuffleUserPayloads.DataMovementEventPayloadProto .parseFrom( @@ -358,6 +362,13 @@ public class TestOnFileSortedOutput { return null; } }).when(reporter).reportDataSize(anyLong()); + doAnswer(new Answer() { + @Override public Object answer(InvocationOnMock invocation) throws Throwable { + numRecords.set((Long) invocation.getArguments()[0]); + return null; + } + }).when(reporter).reportItemsProcessed(anyLong()); + OutputContext context = mock(OutputContext.class); doReturn(counters).when(context).getCounters(); http://git-wip-us.apache.org/repos/asf/tez/blob/f46997a7/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java index 41eb9a4..2b25daf 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java @@ -81,6 +81,7 @@ import org.mockito.ArgumentCaptor; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +@SuppressWarnings("rawtypes") public class TestOnFileUnorderedKVOutput { private static final Logger LOG = LoggerFactory.getLogger(TestOnFileUnorderedKVOutput.class); @@ -139,6 +140,7 @@ public class TestOnFileUnorderedKVOutput { events = kvOutput.close(); assertEquals(45, stats.getIOStatistics().values().iterator().next().getDataSize()); + assertEquals(5, stats.getIOStatistics().values().iterator().next().getItemsProcessed()); assertTrue(events != null && events.size() == 1); CompositeDataMovementEvent dmEvent = (CompositeDataMovementEvent)events.get(0);
