Repository: tez Updated Branches: refs/heads/master e36f962e7 -> 765afd236
TEZ-2360. per-io counters flag should generate both overall and per-edge counters (pramachandran) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/765afd23 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/765afd23 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/765afd23 Branch: refs/heads/master Commit: 765afd236fd178ebc315c1f912102578ec100c70 Parents: e36f962 Author: Prakash Ramachandran <[email protected]> Authored: Fri May 1 00:48:16 2015 +0530 Committer: Prakash Ramachandran <[email protected]> Committed: Fri May 1 00:48:16 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../runtime/api/impl/TezCountersDelegate.java | 74 +++++++- .../java/org/apache/tez/test/TestTezJobs.java | 184 +++++++++++++++++++ 3 files changed, 255 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/765afd23/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d67db81..aa72320 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly ALL CHANGES: + TEZ-2360. per-io counters flag should generate both overall and per-edge counters TEZ-2389. Tez UI: Sort by attempt-no is incorrect in attempts pages. TEZ-2383. Cleanup input/output/processor contexts in LogicalIOProcessorRuntimeTask. TEZ-2084. Tez UI: Stacktrace format info is lost in diagnostics http://git-wip-us.apache.org/repos/asf/tez/blob/765afd23/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezCountersDelegate.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezCountersDelegate.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezCountersDelegate.java index 5286839..3c79530 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezCountersDelegate.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezCountersDelegate.java @@ -18,7 +18,12 @@ package org.apache.tez.runtime.api.impl; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + import org.apache.tez.common.TezUtilsInternal; +import org.apache.tez.common.counters.AbstractCounter; import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.common.counters.TezCounter; import org.apache.tez.common.counters.TezCounters; @@ -27,7 +32,7 @@ public class TezCountersDelegate extends TezCounters { private final String groupModifier; private final TezCounters original; - + public TezCountersDelegate(TezCounters original, String taskVertexName, String edgeVertexName, String type) { this.original = original; @@ -39,10 +44,71 @@ public class TezCountersDelegate extends TezCounters { // the standard mechanism to find a counter. @Override public TezCounter findCounter(String groupName, String counterName) { + String simpleGroupName; if (groupName.equals(TaskCounter.class.getName())) { - groupName = TaskCounter.class.getSimpleName(); + simpleGroupName = TaskCounter.class.getSimpleName(); + } else { + simpleGroupName = groupName; + } + String modifiedGroupName = simpleGroupName + "_" + this.groupModifier; + final TezCounter modifiedGroupCounter = original.findCounter(modifiedGroupName, counterName); + final TezCounter originalGroupCounter = original.findCounter(groupName, counterName); + return new CompositeCounter(modifiedGroupCounter, originalGroupCounter); + } + + /* + * A counter class to wrap multiple counters. increment operation will increment both counters + */ + private static class CompositeCounter extends AbstractCounter { + + TezCounter modifiedCounter; + TezCounter originalCounter; + + public CompositeCounter(TezCounter modifiedCounter, TezCounter originalCounter) { + this.modifiedCounter = modifiedCounter; + this.originalCounter = originalCounter; + } + + @Override + public String getName() { + return modifiedCounter.getName(); + } + + @Override + public String getDisplayName() { + return modifiedCounter.getName(); + } + + @Override + public long getValue() { + return modifiedCounter.getValue(); + } + + @Override + public void setValue(long value) { + modifiedCounter.setValue(value); + originalCounter.setValue(value); + } + + @Override + public void increment(long increment) { + modifiedCounter.increment(increment); + originalCounter.increment(increment); + } + + @Override + public TezCounter getUnderlyingCounter() { + return this; + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + assert false : "shouldn't be called"; + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + assert false : "shouldn't be called"; } - String modifiedGroupName = groupName + "_" + this.groupModifier; - return original.findCounter(modifiedGroupName, counterName); } } http://git-wip-us.apache.org/repos/asf/tez/blob/765afd23/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java index db212fa..13b0c03 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java @@ -19,6 +19,7 @@ package org.apache.tez.test; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -29,6 +30,7 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.BitSet; import java.util.EnumSet; import java.util.HashSet; @@ -41,6 +43,12 @@ import java.util.concurrent.locks.ReentrantLock; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import org.apache.tez.common.counters.CounterGroup; +import org.apache.tez.common.counters.TaskCounter; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.client.StatusGetOpts; +import org.apache.tez.dag.api.client.VertexStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -313,6 +321,103 @@ public class TestTezJobs { } @Test(timeout = 60000) + public void testPerIOCounterAggregation() throws Exception { + String baseDir = "/tmp/perIOCounterAgg/"; + Path inPath1 = new Path(baseDir + "inPath1"); + Path inPath2 = new Path(baseDir + "inPath2"); + Path outPath = new Path(baseDir + "outPath"); + final Set<String> expectedResults = generateSortMergeJoinInput(inPath1, inPath2); + Path stagingDirPath = new Path("/tmp/tez-staging-dir"); + remoteFs.mkdirs(stagingDirPath); + + TezConfiguration conf = new TezConfiguration(mrrTezCluster.getConfig()); + conf.setBoolean(TezConfiguration.TEZ_TASK_GENERATE_COUNTERS_PER_IO, true); + TezClient tezClient = TezClient.create(SortMergeJoinHelper.class.getSimpleName(), conf); + tezClient.start(); + + SortMergeJoinHelper sortMergeJoinHelper = new SortMergeJoinHelper(tezClient); + sortMergeJoinHelper.setConf(conf); + + String[] args = new String[] { + "-D" + TezConfiguration.TEZ_AM_STAGING_DIR + "=" + stagingDirPath.toString(), + inPath1.toString(), inPath2.toString(), "1", outPath.toString() }; + assertEquals(0, sortMergeJoinHelper.run(conf, args, tezClient)); + + verifySortMergeJoinInput(outPath, expectedResults); + + String joinerVertexName = "joiner"; + String input1Name = "input1"; + String input2Name = "input2"; + String joinOutputName = "joinOutput"; + Set<StatusGetOpts> statusOpts = new HashSet<StatusGetOpts>(); + statusOpts.add(StatusGetOpts.GET_COUNTERS); + VertexStatus joinerVertexStatus = + sortMergeJoinHelper.dagClient.getVertexStatus(joinerVertexName, statusOpts); + final TezCounters joinerCounters = joinerVertexStatus.getVertexCounters(); + final CounterGroup aggregatedGroup = joinerCounters.getGroup(TaskCounter.class.getCanonicalName()); + final CounterGroup input1Group = joinerCounters.getGroup( + TaskCounter.class.getSimpleName() + "_" + joinerVertexName + "_INPUT_" + input1Name); + final CounterGroup input2Group = joinerCounters.getGroup( + TaskCounter.class.getSimpleName() + "_" + joinerVertexName + "_INPUT_" + input2Name); + assertTrue("aggregated counter group cannot be empty", aggregatedGroup.size() > 0); + assertTrue("per io group for input1 cannot be empty", input1Group.size() > 0); + assertTrue("per io group for input1 cannot be empty", input2Group.size() > 0); + + List<TaskCounter> countersToVerifyAgg = Arrays.asList( + TaskCounter.ADDITIONAL_SPILLS_BYTES_READ, + TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN, + TaskCounter.COMBINE_INPUT_RECORDS, + TaskCounter.MERGED_MAP_OUTPUTS, + TaskCounter.NUM_DISK_TO_DISK_MERGES, + TaskCounter.NUM_FAILED_SHUFFLE_INPUTS, + TaskCounter.NUM_MEM_TO_DISK_MERGES, + TaskCounter.NUM_SHUFFLED_INPUTS, + TaskCounter.NUM_SKIPPED_INPUTS, + TaskCounter.REDUCE_INPUT_GROUPS, + TaskCounter.REDUCE_INPUT_RECORDS, + TaskCounter.SHUFFLE_BYTES, + TaskCounter.SHUFFLE_BYTES_DECOMPRESSED, + TaskCounter.SHUFFLE_BYTES_DISK_DIRECT, + TaskCounter.SHUFFLE_BYTES_TO_DISK, + TaskCounter.SHUFFLE_BYTES_TO_MEM, + TaskCounter.SPILLED_RECORDS + ); + + int nonZeroCounters = 0; + // verify that the sum of the counter values for edges add up to the aggregated counter value. + for(TaskCounter c : countersToVerifyAgg) { + TezCounter aggregatedCounter = aggregatedGroup.findCounter(c.name(), false); + TezCounter input1Counter = input1Group.findCounter(c.name(), false); + TezCounter input2Counter = input2Group.findCounter(c.name(), false); + assertNotNull("aggregated counter cannot be null " + c.name(), aggregatedCounter); + assertNotNull("input1 counter cannot be null " + c.name(), input1Counter); + assertNotNull("input2 counter cannot be null " + c.name(), input2Counter); + + assertEquals("aggregated counter does not match sum of input counters " + c.name(), + aggregatedCounter.getValue(), input1Counter.getValue() + input2Counter.getValue()); + + if (aggregatedCounter.getValue() > 0) { + nonZeroCounters++; + } + } + + // ensure that at least one of the counters tested above were non-zero. + assertTrue("At least one of the counter should be non-zero. invalid test ", nonZeroCounters > 0); + + CounterGroup joinerOutputGroup = joinerCounters.getGroup( + TaskCounter.class.getSimpleName() + "_" + joinerVertexName + "_OUTPUT_" + joinOutputName); + String outputCounterName = TaskCounter.OUTPUT_RECORDS.name(); + TezCounter aggregateCounter = aggregatedGroup.findCounter(outputCounterName, false); + TezCounter joinerOutputCounter = joinerOutputGroup.findCounter(outputCounterName, false); + assertNotNull("aggregated counter cannot be null " + outputCounterName, aggregateCounter); + assertNotNull("output counter cannot be null " + outputCounterName, joinerOutputCounter); + assertTrue("counter value is zero. test is invalid", aggregateCounter.getValue() > 0); + assertEquals("aggregated counter does not match sum of output counters " + outputCounterName, + aggregateCounter.getValue(), joinerOutputCounter.getValue()); + } + + + @Test(timeout = 60000) public void testSortMergeJoinExampleDisableSplitGrouping() throws Exception { SortMergeJoinExample sortMergeJoinExample = new SortMergeJoinExample(); sortMergeJoinExample.setConf(conf); @@ -982,4 +1087,83 @@ public class TestTezJobs { } } } + + private static class SortMergeJoinHelper extends SortMergeJoinExample { + private final TezClient tezClientInternal; + private DAGClient dagClient; + + public SortMergeJoinHelper(TezClient tezClient) { + this.tezClientInternal = tezClient; + } + + @Override + public int runDag(DAG dag, boolean printCounters, Logger logger) throws TezException, + InterruptedException, IOException { + tezClientInternal.waitTillReady(); + dagClient = tezClientInternal.submitDAG(dag); + Set<StatusGetOpts> getOpts = new HashSet<StatusGetOpts>(); + if (printCounters) { + getOpts.add(StatusGetOpts.GET_COUNTERS); + } + + DAGStatus dagStatus; + dagStatus = dagClient.waitForCompletionWithStatusUpdates(getOpts); + + if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) { + logger.info("DAG diagnostics: " + dagStatus.getDiagnostics()); + return -1; + } + return 0; + } + } + + private Set<String> generateSortMergeJoinInput(Path inPath1, Path inPath2) throws + IOException { + remoteFs.mkdirs(inPath1); + remoteFs.mkdirs(inPath2); + + Set<String> expectedResult = new HashSet<String>(); + FSDataOutputStream out1 = remoteFs.create(new Path(inPath1, "file")); + FSDataOutputStream out2 = remoteFs.create(new Path(inPath2, "file")); + + BufferedWriter writer1 = new BufferedWriter(new OutputStreamWriter(out1)); + BufferedWriter writer2 = new BufferedWriter(new OutputStreamWriter(out2)); + for (int i = 0; i < 20; i++) { + String term = "term" + i; + writer1.write(term); + writer1.newLine(); + if (i % 2 == 0) { + writer2.write(term); + writer2.newLine(); + expectedResult.add(term); + } + } + writer1.close(); + writer2.close(); + out1.close(); + out2.close(); + + return expectedResult; + } + + private void verifySortMergeJoinInput(Path outPath, Set<String> expectedResult) throws + IOException { + FileStatus[] statuses = remoteFs.listStatus(outPath, new PathFilter() { + public boolean accept(Path p) { + String name = p.getName(); + return !name.startsWith("_") && !name.startsWith("."); + } + }); + assertEquals(1, statuses.length); + FSDataInputStream inStream = remoteFs.open(statuses[0].getPath()); + BufferedReader reader = new BufferedReader(new InputStreamReader(inStream)); + String line; + while ((line = reader.readLine()) != null) { + assertTrue(expectedResult.remove(line)); + } + reader.close(); + inStream.close(); + assertEquals(0, expectedResult.size()); + } + }
