Repository: tez Updated Branches: refs/heads/master d2b9222fb -> c6a7d76ea
TEZ-3308. Add counters to capture input split length. Contributed by Harish Jaiprakash. Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c6a7d76e Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c6a7d76e Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c6a7d76e Branch: refs/heads/master Commit: c6a7d76eab86643cdb7d7537c3d5a4df4eb7387c Parents: d2b9222 Author: Siddharth Seth <[email protected]> Authored: Mon Jun 27 18:29:39 2016 -0700 Committer: Siddharth Seth <[email protected]> Committed: Mon Jun 27 18:29:39 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/tez/common/counters/TaskCounter.java | 7 +++- .../org/apache/tez/mapreduce/input/MRInput.java | 40 ++++++++++++++++---- .../tez/mapreduce/input/MultiMRInput.java | 31 +++++++++++---- .../apache/tez/mapreduce/input/TestMRInput.java | 11 +++++- .../tez/mapreduce/input/TestMultiMRInput.java | 29 +++++++++----- 6 files changed, 92 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/c6a7d76e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 26ff72c..1617c91 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3308. Add counters to capture input split length. TEZ-3302. Add a version of processorContext.waitForAllInputsReady and waitForAnyInputReady with a timeout. TEZ-3291. Optimize splits grouping when locality information is not available. TEZ-3305. TestAnalyzer fails on Hadoop 2.7. @@ -67,6 +68,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3308. Add counters to capture input split length. TEZ-3302. Add a version of processorContext.waitForAllInputsReady and waitForAnyInputReady with a timeout. TEZ-3305. TestAnalyzer fails on Hadoop 2.7. TEZ-3304. TestHistoryParser fails with Hadoop 2.7. http://git-wip-us.apache.org/repos/asf/tez/blob/c6a7d76e/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java index 7dcdf8a..2f18bc6 100644 --- a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java +++ b/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java @@ -84,7 +84,12 @@ public enum TaskCounter { * */ INPUT_RECORDS_PROCESSED, - + + /** + * Number bytes for a task context, currently used by MRInput. + */ + INPUT_SPLIT_LENGTH_BYTES, + // /** * Represents the number of actual output records. http://git-wip-us.apache.org/repos/asf/tez/blob/c6a7d76e/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java index e859058..af4b05c 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java @@ -36,6 +36,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; @@ -489,10 +490,16 @@ public class MRInput extends MRInputBase { TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[getContext().getTaskIndex()]; TaskSplitIndex splitMetaInfo = new TaskSplitIndex(thisTaskMetaInfo.getSplitLocation(), thisTaskMetaInfo.getStartOffset()); + long splitLength = -1; if (useNewApi) { org.apache.hadoop.mapreduce.InputSplit newInputSplit = MRInputUtils .getNewSplitDetailsFromDisk(splitMetaInfo, jobConf, getContext().getCounters() .findCounter(TaskCounter.SPLIT_RAW_BYTES)); + try { + splitLength = newInputSplit.getLength(); + } catch (InterruptedException e) { + LOG.warn("Got interrupted while reading split length: ", e); + } mrReader = new MRReaderMapReduce(jobConf, newInputSplit, getContext().getCounters(), inputRecordCounter, getContext().getApplicationId().getClusterTimestamp(), getContext().getTaskVertexIndex(), getContext().getApplicationId().getId(), @@ -501,10 +508,15 @@ public class MRInput extends MRInputBase { org.apache.hadoop.mapred.InputSplit oldInputSplit = MRInputUtils .getOldSplitDetailsFromDisk(splitMetaInfo, jobConf, getContext().getCounters() .findCounter(TaskCounter.SPLIT_RAW_BYTES)); + splitLength = oldInputSplit.getLength(); mrReader = new MRReaderMapred(jobConf, oldInputSplit, getContext().getCounters(), inputRecordCounter, getContext()); } + if (splitLength != -1) { + getContext().getCounters().findCounter(TaskCounter.INPUT_SPLIT_LENGTH_BYTES) + .increment(splitLength); + } } } finally { rrLock.unlock(); @@ -650,24 +662,36 @@ public class MRInput extends MRInputBase { } Preconditions.checkState(initEvent != null, "InitEvent must be specified"); MRSplitProto splitProto = MRSplitProto.parseFrom(ByteString.copyFrom(initEvent.getUserPayload())); - Object split = null; + Object splitObj = null; + long splitLength = -1; if (useNewApi) { - split = MRInputUtils.getNewSplitDetailsFromEvent(splitProto, jobConf); + InputSplit split = MRInputUtils.getNewSplitDetailsFromEvent(splitProto, jobConf); + splitObj = split; + try { + splitLength = split.getLength(); + } catch (InterruptedException e) { + LOG.warn("Thread interrupted while getting split length: ", e); + } if (LOG.isDebugEnabled()) { LOG.debug(getContext().getSourceVertexName() + " split Details -> SplitClass: " + - split.getClass().getName() + ", NewSplit: " - + split); + split.getClass().getName() + ", NewSplit: " + split + ", length: " + splitLength); } } else { - split = MRInputUtils.getOldSplitDetailsFromEvent(splitProto, jobConf); + org.apache.hadoop.mapred.InputSplit split = + MRInputUtils.getOldSplitDetailsFromEvent(splitProto, jobConf); + splitObj = split; + splitLength = split.getLength(); if (LOG.isDebugEnabled()) { LOG.debug(getContext().getSourceVertexName() + " split Details -> SplitClass: " + - split.getClass().getName() + ", OldSplit: " - + split); + split.getClass().getName() + ", OldSplit: " + split + ", length: " + splitLength); } } - mrReader.setSplit(split); + if (splitLength != -1) { + getContext().getCounters().findCounter(TaskCounter.INPUT_SPLIT_LENGTH_BYTES) + .increment(splitLength); + } + mrReader.setSplit(splitObj); LOG.info(getContext().getSourceVertexName() + " initialized RecordReader from event"); } http://git-wip-us.apache.org/repos/asf/tez/blob/c6a7d76e/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java index 2b60f29..efbeeaa 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java @@ -40,6 +40,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.InputSplit; import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.mapreduce.input.base.MRInputBase; import org.apache.tez.mapreduce.lib.MRInputUtils; @@ -168,30 +169,44 @@ public class MultiMRInput extends MRInputBase { LOG.debug(getContext().getSourceVertexName() + " initializing Reader: " + eventCount.get()); } MRSplitProto splitProto = MRSplitProto.parseFrom(ByteString.copyFrom(event.getUserPayload())); - Object split = null; MRReader reader = null; JobConf localJobConf = new JobConf(jobConf); + long splitLength = -1; if (useNewApi) { - split = MRInputUtils.getNewSplitDetailsFromEvent(splitProto, localJobConf); - reader = new MRReaderMapReduce(localJobConf, (org.apache.hadoop.mapreduce.InputSplit) split, + InputSplit split = MRInputUtils.getNewSplitDetailsFromEvent(splitProto, localJobConf); + try { + splitLength = split.getLength(); + } catch (InterruptedException e) { + LOG.warn("Got interrupted while reading split length: ", e); + } + reader = new MRReaderMapReduce(localJobConf, split, getContext().getCounters(), inputRecordCounter, getContext().getApplicationId() .getClusterTimestamp(), getContext().getTaskVertexIndex(), getContext() .getApplicationId().getId(), getContext().getTaskIndex(), getContext() .getTaskAttemptNumber(), getContext()); if (LOG.isDebugEnabled()) { LOG.debug(getContext().getSourceVertexName() + " split Details -> SplitClass: " + - split.getClass().getName() + ", NewSplit: " + split); + split.getClass().getName() + ", NewSplit: " + split + ", length: " + splitLength); + } + if (splitLength != -1) { + getContext().getCounters().findCounter(TaskCounter.INPUT_SPLIT_LENGTH_BYTES) + .increment(splitLength); } - } else { - split = MRInputUtils.getOldSplitDetailsFromEvent(splitProto, localJobConf); - reader = new MRReaderMapred(localJobConf, (org.apache.hadoop.mapred.InputSplit) split, + org.apache.hadoop.mapred.InputSplit split = + MRInputUtils.getOldSplitDetailsFromEvent(splitProto, localJobConf); + splitLength = split.getLength(); + reader = new MRReaderMapred(localJobConf, split, getContext().getCounters(), inputRecordCounter, getContext()); if (LOG.isDebugEnabled()) { LOG.debug(getContext().getSourceVertexName() + " split Details -> SplitClass: " + - split.getClass().getName() + ", OldSplit: " + split); + split.getClass().getName() + ", OldSplit: " + split + ", length: " + splitLength); } } + if (splitLength != -1) { + getContext().getCounters().findCounter(TaskCounter.INPUT_SPLIT_LENGTH_BYTES) + .increment(splitLength); + } LOG.info(getContext().getSourceVertexName() + " initialized RecordReader from event"); return reader; } http://git-wip-us.apache.org/repos/asf/tez/blob/c6a7d76e/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java index b878416..9109cd9 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java @@ -28,6 +28,7 @@ import java.io.DataOutput; import java.io.IOException; import java.util.LinkedList; import java.util.List; +import java.util.Random; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; @@ -38,6 +39,8 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.common.counters.TaskCounter; +import org.apache.tez.common.counters.TezCounter; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.DataSourceDescriptor; import org.apache.tez.mapreduce.hadoop.MRInputHelpers; @@ -138,10 +141,12 @@ public class TestMRInput { List<Event> events = new LinkedList<>(); events.add(diEvent); mrInput.handleEvents(events); + TezCounter counter = mrInput.getContext().getCounters() + .findCounter(TaskCounter.INPUT_SPLIT_LENGTH_BYTES); + assertEquals(counter.getValue(), TestInputSplit.length); assertTrue(TestInputFormat.invoked.get()); } - /** * Test class to verify */ @@ -210,9 +215,11 @@ public class TestMRInput { public static class TestInputSplit implements InputSplit { + public static long length = Math.abs(new Random().nextLong()); + @Override public long getLength() throws IOException { - return 0; + return length; } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/c6a7d76e/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java index 1733bfc..ab4a5d9 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java @@ -34,6 +34,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Random; import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +49,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.common.TezUtils; +import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.mapreduce.hadoop.MRInputHelpers; @@ -141,8 +143,9 @@ public class TestMultiMRInput { List<Event> eventList = new ArrayList<Event>(); String file1 = "file1"; + AtomicLong file1Length = new AtomicLong(); LinkedHashMap<LongWritable, Text> data1 = createInputData(localFs, workDir, jobConf, file1, 0, - 10); + 10, file1Length); SequenceFileInputFormat<LongWritable, Text> format = new SequenceFileInputFormat<LongWritable, Text>(); InputSplit[] splits = format.getSplits(jobConf, 1); @@ -171,13 +174,16 @@ public class TestMultiMRInput { assertEquals(val, data1.remove(key)); } try { - boolean hasNext = reader.next(); //should throw exception + reader.next(); //should throw exception fail(); } catch(IOException e) { assertTrue(e.getMessage().contains("For usage, please refer to")); } } assertEquals(1, readerCount); + long counterValue = input.getContext().getCounters() + .findCounter(TaskCounter.INPUT_SPLIT_LENGTH_BYTES).getValue(); + assertEquals(file1Length.get(), counterValue); } @Test(timeout = 5000) @@ -202,12 +208,14 @@ public class TestMultiMRInput { LinkedHashMap<LongWritable, Text> data = new LinkedHashMap<LongWritable, Text>(); String file1 = "file1"; + AtomicLong file1Length = new AtomicLong(); LinkedHashMap<LongWritable, Text> data1 = createInputData(localFs, workDir, jobConf, file1, 0, - 10); + 10, file1Length); String file2 = "file2"; + AtomicLong file2Length = new AtomicLong(); LinkedHashMap<LongWritable, Text> data2 = createInputData(localFs, workDir, jobConf, file2, 10, - 20); + 20, file2Length); data.putAll(data1); data.putAll(data2); @@ -245,12 +253,15 @@ public class TestMultiMRInput { } try { - boolean hasNext = reader.next(); //should throw exception + reader.next(); //should throw exception fail(); } catch(IOException e) { assertTrue(e.getMessage().contains("For usage, please refer to")); } } + long counterValue = input.getContext().getCounters() + .findCounter(TaskCounter.INPUT_SPLIT_LENGTH_BYTES).getValue(); + assertEquals(file1Length.get() + file2Length.get(), counterValue); assertEquals(2, readerCount); } @@ -273,7 +284,7 @@ public class TestMultiMRInput { List<Event> eventList = new ArrayList<Event>(); String file1 = "file1"; - createInputData(localFs, workDir, jobConf, file1, 0, 10); + createInputData(localFs, workDir, jobConf, file1, 0, 10, new AtomicLong()); SequenceFileInputFormat<LongWritable, Text> format = new SequenceFileInputFormat<LongWritable, Text>(); InputSplit[] splits = format.getSplits(jobConf, 1); @@ -325,9 +336,8 @@ public class TestMultiMRInput { } public static LinkedHashMap<LongWritable, Text> createInputData(FileSystem fs, Path workDir, - JobConf job, String filename, - long startKey, - long numKeys) throws IOException { + JobConf job, String filename, long startKey, long numKeys, AtomicLong fileLength) + throws IOException { LinkedHashMap<LongWritable, Text> data = new LinkedHashMap<LongWritable, Text>(); Path file = new Path(workDir, filename); LOG.info("Generating data at path: " + file); @@ -346,6 +356,7 @@ public class TestMultiMRInput { writer.append(key, value); LOG.info("<k, v> : <" + key.get() + ", " + value + ">"); } + fileLength.set(writer.getLength()); } finally { writer.close(); }
