Repository: tez Updated Branches: refs/heads/branch-0.7 f442c63f5 -> bdf5b69f4
TEZ-3317. Speculative execution starts too early due to 0 progress. Contributed by Kuhu Shukla. Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/0a6db289 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/0a6db289 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/0a6db289 Branch: refs/heads/branch-0.7 Commit: 0a6db2890089795f642dbc9ab2ea52fe18db28f6 Parents: f442c63 Author: Siddharth Seth <[email protected]> Authored: Wed Oct 26 11:04:08 2016 -0700 Committer: Siddharth Seth <[email protected]> Committed: Wed Oct 26 11:04:08 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../tez/runtime/api/AbstractLogicalInput.java | 5 + .../tez/runtime/api/MergedLogicalInput.java | 5 + .../org/apache/tez/mapreduce/input/MRInput.java | 5 +- .../tez/mapreduce/input/MRInputLegacy.java | 4 + .../mapreduce/processor/map/MapProcessor.java | 57 ++++++-- .../processor/reduce/ReduceProcessor.java | 59 ++++++-- .../processor/map/TestMapProcessor.java | 142 +++++++++++++++++++ .../api/impl/TezProcessorContextImpl.java | 6 +- .../common/readers/UnorderedKVReader.java | 15 ++ .../common/shuffle/impl/ShuffleManager.java | 9 ++ .../input/ConcatenatedMergedKeyValueInput.java | 14 +- .../input/ConcatenatedMergedKeyValuesInput.java | 15 +- .../library/input/OrderedGroupedKVInput.java | 17 +++ .../input/OrderedGroupedMergedKVInput.java | 8 +- .../runtime/library/input/UnorderedKVInput.java | 4 + .../library/processor/SimpleProcessor.java | 40 +++++- .../library/processor/SleepProcessor.java | 45 +++++- .../processor/FilterByWordInputProcessor.java | 53 +++++-- .../processor/FilterByWordOutputProcessor.java | 1 + 20 files changed, 456 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/0a6db289/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 97b6292..e2eaf70 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3317. Speculative execution starts too early due to 0 progress. TEZ-3452. Auto-reduce parallelism calculation can overflow with large inputs TEZ-3439. Tez joinvalidate fails when first input argument size is bigger than the second. TEZ-3464. Fix findbugs warnings in tez-dag mainLoop http://git-wip-us.apache.org/repos/asf/tez/blob/0a6db289/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java index dea79b7..4c95eb9 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java @@ -17,6 +17,7 @@ */ package org.apache.tez.runtime.api; +import java.io.IOException; import java.util.List; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -80,4 +81,8 @@ public abstract class AbstractLogicalInput implements LogicalInput, LogicalInput public final InputContext getContext() { return inputContext; } + + public float getProgress() throws IOException, InterruptedException { + return 0.0f; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/0a6db289/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java index dedc902..3195a17 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java @@ -18,6 +18,7 @@ package org.apache.tez.runtime.api; +import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -92,4 +93,8 @@ public abstract class MergedLogicalInput implements LogicalInput { * Used by the framework to inform the MergedInput that one of it's constituent Inputs is ready. */ public abstract void setConstituentInputIsReady(Input input); + + public float getProgress() throws IOException, InterruptedException { + return 0.0f; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/0a6db289/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java index b68d135..46ac87f 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java @@ -581,8 +581,9 @@ public class MRInput extends MRInputBase { } } - public float getProgress() throws IOException, InterruptedException { - return mrReader.getProgress(); + @Override + public float getProgress() throws IOException,InterruptedException { + return (mrReader != null) ? mrReader.getProgress() : 0.0f; } void processSplitEvent(InputDataInformationEvent event) http://git-wip-us.apache.org/repos/asf/tez/blob/0a6db289/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java index e83c36a..9b5ed1c 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java @@ -87,6 +87,10 @@ public class MRInputLegacy extends MRInput { return (org.apache.hadoop.mapreduce.RecordReader) mrReader.getRecordReader(); } + public float getProgress() throws IOException, InterruptedException { + return super.getProgress(); + } + @Private public InputSplit getOldInputSplit() { return (InputSplit) mrReader.getSplit(); http://git-wip-us.apache.org/repos/asf/tez/blob/0a6db289/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java index 1a12a21..017acf8 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java @@ -21,6 +21,8 @@ import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Timer; +import java.util.TimerTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,6 +46,7 @@ import org.apache.tez.mapreduce.input.MRInputLegacy; import org.apache.tez.mapreduce.output.MROutputLegacy; import org.apache.tez.mapreduce.processor.MRTask; import org.apache.tez.mapreduce.processor.MRTaskReporter; +import org.apache.tez.runtime.api.AbstractLogicalInput; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.api.LogicalOutput; @@ -58,6 +61,35 @@ public class MapProcessor extends MRTask{ private static final Logger LOG = LoggerFactory.getLogger(MapProcessor.class); + protected Map<String, LogicalInput> inputs; + protected Map<String, LogicalOutput> outputs; + Timer progressTimer = new Timer(); + TimerTask progressTask = new TimerTask() { + + @Override + public void run() { + try { + float progSum = 0.0f; + if (inputs != null && inputs.size() != 0) { + for(LogicalInput input : inputs.values()) { + if (input instanceof AbstractLogicalInput) { + progSum += ((AbstractLogicalInput) input).getProgress(); + } + } + float progress = (1.0f) * progSum / inputs.size(); + getContext().setProgress(progress); + mrReporter.setProgress(progress); + } + } catch (IOException e) { + LOG.warn("Encountered IOException during Processor progress update" + + e.getMessage()); + } catch (InterruptedException e) { + LOG.warn("Encountered InterruptedException during Processor progress" + + "update" + e.getMessage()); + } + } + }; + public MapProcessor(ProcessorContext processorContext) { super(processorContext, true); } @@ -69,32 +101,35 @@ public class MapProcessor extends MRTask{ } public void close() throws IOException { - // TODO Auto-generated method stub + progressTimer.cancel(); } @Override - public void run(Map<String, LogicalInput> inputs, - Map<String, LogicalOutput> outputs) throws Exception { + public void run(Map<String, LogicalInput> _inputs, + Map<String, LogicalOutput> _outputs) throws Exception { + this.inputs = _inputs; + this.outputs = _outputs; LOG.info("Running map: " + processorContext.getUniqueIdentifier()); - for (LogicalInput input : inputs.values()) { + for (LogicalInput input : _inputs.values()) { input.start(); } - for (LogicalOutput output : outputs.values()) { + for (LogicalOutput output : _outputs.values()) { output.start(); } - if (inputs.size() != 1 - || outputs.size() != 1) { + if (_inputs.size() != 1 + || _outputs.size() != 1) { throw new IOException("Cannot handle multiple inputs or outputs" - + ", inputCount=" + inputs.size() - + ", outputCount=" + outputs.size()); + + ", inputCount=" + _inputs.size() + + ", outputCount=" + _outputs.size()); } - LogicalInput in = inputs.values().iterator().next(); - LogicalOutput out = outputs.values().iterator().next(); + LogicalInput in = _inputs.values().iterator().next(); + LogicalOutput out = _outputs.values().iterator().next(); initTask(out); + progressTimer.schedule(progressTask, 0, 100); // Sanity check if (!(in instanceof MRInputLegacy)) { http://git-wip-us.apache.org/repos/asf/tez/blob/0a6db289/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java index 996cf84..8ec6091 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java @@ -22,6 +22,8 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +44,7 @@ import org.apache.tez.dag.api.TezException; import org.apache.tez.mapreduce.output.MROutputLegacy; import org.apache.tez.mapreduce.processor.MRTask; import org.apache.tez.mapreduce.processor.MRTaskReporter; +import org.apache.tez.runtime.api.AbstractLogicalInput; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.Input; import org.apache.tez.runtime.api.LogicalInput; @@ -63,6 +66,35 @@ public class ReduceProcessor extends MRTask { private Counter reduceInputKeyCounter; private Counter reduceInputValueCounter; + protected Map<String, LogicalInput> inputs; + protected Map<String, LogicalOutput> outputs; + Timer progressTimer = new Timer(); + TimerTask progressTask = new TimerTask() { + + @Override + public void run() { + try { + float progSum = 0.0f; + if (inputs != null && inputs.size() != 0) { + for(LogicalInput input : inputs.values()) { + if (input instanceof AbstractLogicalInput) { + progSum += ((AbstractLogicalInput) input).getProgress(); + } + } + float progress = (1.0f) * progSum / inputs.size(); + getContext().setProgress(progress); + mrReporter.setProgress(progress); + } + } catch (IOException e) { + LOG.warn("Encountered IOException during Processor progress update" + + e.getMessage()); + } catch (InterruptedException e) { + LOG.warn("Encountered InterruptedException during Processor progress" + + "update" + e.getMessage()); + } + } + }; + public ReduceProcessor(ProcessorContext processorContext) { super(processorContext, false); } @@ -74,27 +106,28 @@ public class ReduceProcessor extends MRTask { } public void close() throws IOException { - // TODO Auto-generated method stub + progressTimer.cancel(); } @Override - public void run(Map<String, LogicalInput> inputs, - Map<String, LogicalOutput> outputs) throws Exception { - + public void run(Map<String, LogicalInput> _inputs, + Map<String, LogicalOutput> _outputs) throws Exception { + this.inputs = _inputs; + this.outputs = _outputs; LOG.info("Running reduce: " + processorContext.getUniqueIdentifier()); - if (outputs.size() <= 0 || outputs.size() > 1) { - throw new IOException("Invalid number of outputs" - + ", outputCount=" + outputs.size()); + if (_outputs.size() <= 0 || _outputs.size() > 1) { + throw new IOException("Invalid number of _outputs" + + ", outputCount=" + _outputs.size()); } - if (inputs.size() <= 0 || inputs.size() > 1) { - throw new IOException("Invalid number of inputs" - + ", inputCount=" + inputs.size()); + if (_inputs.size() <= 0 || _inputs.size() > 1) { + throw new IOException("Invalid number of _inputs" + + ", inputCount=" + _inputs.size()); } - LogicalInput in = inputs.values().iterator().next(); + LogicalInput in = _inputs.values().iterator().next(); in.start(); List<Input> pendingInputs = new LinkedList<Input>(); @@ -102,11 +135,11 @@ public class ReduceProcessor extends MRTask { processorContext.waitForAllInputsReady(pendingInputs); LOG.info("Input is ready for consumption. Starting Output"); - LogicalOutput out = outputs.values().iterator().next(); + LogicalOutput out = _outputs.values().iterator().next(); out.start(); initTask(out); - + progressTimer.schedule(progressTask, 0, 100); this.statusUpdate(); Class keyClass = ConfigUtils.getIntermediateInputKeyClass(jobConf); http://git-wip-us.apache.org/repos/asf/tez/blob/0a6db289/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java index 70f8763..53b8c46 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java @@ -21,7 +21,22 @@ package org.apache.tez.mapreduce.processor.map; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; +import java.util.Random; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.split.JobSplit; +import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez; +import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -69,6 +84,9 @@ public class TestMapProcessor { private static JobConf defaultConf = new JobConf(); private static FileSystem localFs = null; private static Path workDir = null; + static float progressUpdate = 0.0f; + final private static FsPermission JOB_FILE_PERMISSION = FsPermission + .createImmutable((short) 0644); static { try { defaultConf.set("fs.defaultFS", "file:///"); @@ -184,4 +202,128 @@ public class TestMapProcessor { } reader.close(); } + + @Test(timeout = 10000) + public void testMapProcessorProgress() throws Exception { + String dagName = "mrdag0"; + String vertexName = MultiStageMRConfigUtil.getInitialMapVertexName(); + JobConf jobConf = new JobConf(defaultConf); + setUpJobConf(jobConf); + + MRHelpers.translateMRConfToTez(jobConf); + jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0); + + jobConf.setBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS, false); + + jobConf.set(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR, new Path(workDir, + "localized-resources").toUri().toString()); + + Path mapInput = new Path(workDir, "map0"); + + + generateInputSplit(localFs, workDir, jobConf, mapInput); + + InputSpec mapInputSpec = new InputSpec("NullSrcVertex", + InputDescriptor.create(MRInputLegacy.class.getName()) + .setUserPayload(UserPayload.create(ByteBuffer.wrap( + MRRuntimeProtos.MRInputUserPayloadProto.newBuilder() + .setConfigurationBytes(TezUtils.createByteStringFromConf + (jobConf)).build() + .toByteArray()))), + 1); + OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex", + OutputDescriptor.create(OrderedPartitionedKVOutput.class.getName()) + .setUserPayload(TezUtils.createUserPayloadFromConf(jobConf)), 1); + + final LogicalIOProcessorRuntimeTask task = MapUtils.createLogicalTask + (localFs, workDir, jobConf, 0, + new Path(workDir, "map0"), new TestUmbilical(), dagName, vertexName, + Collections.singletonList(mapInputSpec), + Collections.singletonList(mapOutputSpec)); + + ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + Thread monitorProgress = new Thread(new Runnable() { + @Override + public void run() { + float prog = task.getProgress(); + if(prog > 0.0 && prog < 1.0) + progressUpdate = prog; + } + }); + + task.initialize(); + scheduler.scheduleAtFixedRate(monitorProgress, 0, 10, + TimeUnit.MILLISECONDS); + task.run(); + Assert.assertTrue("Progress Updates should be captured!", + progressUpdate != 0.0f); + task.close(); + } + + public static void generateInputSplit(FileSystem fs, Path workDir, + JobConf jobConf, Path mapInput) + throws IOException { + jobConf.setInputFormat(SequenceFileInputFormat.class); + FileInputFormat.setInputPaths(jobConf, workDir); + + LOG.info("Generating data at path: " + mapInput); + // create a file with length entries + SequenceFile.Writer writer = + SequenceFile.createWriter(fs, jobConf, mapInput, + LongWritable.class, Text.class); + try { + Random r = new Random(System.currentTimeMillis()); + LongWritable key = new LongWritable(); + Text value = new Text(); + for (int i = 100000; i > 0; i--) { + key.set(r.nextInt(1000)); + value.set(Integer.toString(i)); + writer.append(key, value); + LOG.info("<k, v> : <" + key.get() + ", " + value + ">"); + } + } finally { + writer.close(); + } + + SequenceFileInputFormat<LongWritable, Text> format = + new SequenceFileInputFormat<LongWritable, Text>(); + InputSplit[] splits = format.getSplits(jobConf, 1); + System.err.println("#split = " + splits.length + " ; " + + "#locs = " + splits[0].getLocations().length + "; " + + "loc = " + splits[0].getLocations()[0] + "; " + + "off = " + splits[0].getLength() + "; " + + "file = " + ((FileSplit)splits[0]).getPath()); + writeSplitFiles(fs, jobConf, splits[0]); + } + + private static void writeSplitFiles(FileSystem fs, JobConf conf, + InputSplit split) throws IOException { + Path jobSplitFile = new Path(conf.get(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR, + MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR_DEFAULT), MRJobConfig.JOB_SPLIT); + LOG.info("Writing split to: " + jobSplitFile); + FSDataOutputStream out = FileSystem.create(fs, jobSplitFile, + new FsPermission(JOB_FILE_PERMISSION)); + + long offset = out.getPos(); + Text.writeString(out, split.getClass().getName()); + split.write(out); + out.close(); + + String[] locations = split.getLocations(); + + JobSplit.SplitMetaInfo info = null; + info = new JobSplit.SplitMetaInfo(locations, offset, split.getLength()); + + Path jobSplitMetaInfoFile = new Path( + conf.get(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR), + MRJobConfig.JOB_SPLIT_METAINFO); + + FSDataOutputStream outMeta = FileSystem.create(fs, jobSplitMetaInfoFile, + new FsPermission(JOB_FILE_PERMISSION)); + outMeta.write(SplitMetaInfoReaderTez.META_SPLIT_FILE_HEADER); + WritableUtils.writeVInt(outMeta, SplitMetaInfoReaderTez.META_SPLIT_VERSION); + WritableUtils.writeVInt(outMeta, 1); // Only 1 split meta info being written + info.write(outMeta); + outMeta.close(); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/0a6db289/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java index 0c3283d..fb13d5e 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java @@ -93,8 +93,10 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements Proce @Override public void setProgress(float progress) { - runtimeTask.setProgress(progress); - notifyProgress(); + if (runtimeTask.getProgress() != progress) { + runtimeTask.setProgress(progress); + notifyProgress(); + } } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/0a6db289/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java index a7fd7c8..9526bc3 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java @@ -72,6 +72,8 @@ public class UnorderedKVReader<K, V> extends KeyValueReader { // TODO Remove this once per I/O counters are separated properly. Relying on // the counter at the moment will generate aggregate numbers. private int numRecordsRead = 0; + private long totalBytesRead = 0; + private long totalFileBytes = 0; public UnorderedKVReader(ShuffleManager shuffleManager, Configuration conf, @@ -145,6 +147,17 @@ public class UnorderedKVReader<K, V> extends KeyValueReader { return value; } + public float getProgress() { + int numInputs = shuffleManager.getNumInputs(); + if (totalFileBytes > 0 && numInputs > 0) { + return ((1.0f) * (totalBytesRead + ((currentReader != null) ? currentReader.bytesRead : + 0l)) / + totalFileBytes) * ( + shuffleManager.getNumCompletedInputs().floatValue() / + (1.0f * numInputs)); + } + return 0l; + } /** * Tries reading the next key and value from the current reader. * @return true if the current reader has more records @@ -176,6 +189,7 @@ public class UnorderedKVReader<K, V> extends KeyValueReader { */ private boolean moveToNextInput() throws IOException { if (currentReader != null) { // Close the current reader. + totalBytesRead += currentReader.bytesRead; currentReader.close(); /** * clear reader explicitly. Otherwise this could point to stale reference when next() is @@ -195,6 +209,7 @@ public class UnorderedKVReader<K, V> extends KeyValueReader { return false; // No more inputs } else { currentReader = openIFileReader(currentFetchedInput); + totalFileBytes += currentReader.getLength(); return true; } } http://git-wip-us.apache.org/repos/asf/tez/blob/0a6db289/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index 4c78711..fbe62ad 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -855,6 +855,15 @@ public class ShuffleManager implements FetcherCallback { } while (input instanceof NullFetchedInput); return input; } + + public int getNumInputs() { + return numInputs; + } + + public AtomicInteger getNumCompletedInputs() { + return numCompletedInputs; + } + /////////////////// End of methods for walking the available inputs http://git-wip-us.apache.org/repos/asf/tez/blob/0a6db289/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java index 728121a..3d1805a 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java @@ -37,6 +37,7 @@ import org.apache.tez.runtime.library.api.KeyValueReader; */ @Public public class ConcatenatedMergedKeyValueInput extends MergedLogicalInput { + private ConcatenatedMergedKeyValueReader concatenatedMergedKeyValueReader; public ConcatenatedMergedKeyValueInput(MergedInputContext context, List<Input> inputs) { @@ -81,7 +82,10 @@ public class ConcatenatedMergedKeyValueInput extends MergedLogicalInput { public Object getCurrentValue() throws IOException { return currentReader.getCurrentValue(); } - + + public float getProgress() { + return (1.0f)*(currentReaderIndex + 1)/getInputs().size(); + } } /** @@ -90,11 +94,17 @@ public class ConcatenatedMergedKeyValueInput extends MergedLogicalInput { */ @Override public KeyValueReader getReader() throws Exception { - return new ConcatenatedMergedKeyValueReader(); + concatenatedMergedKeyValueReader = new ConcatenatedMergedKeyValueReader(); + return concatenatedMergedKeyValueReader; } @Override public void setConstituentInputIsReady(Input input) { informInputReady(); } + + @Override + public float getProgress() throws IOException, InterruptedException { + return concatenatedMergedKeyValueReader.getProgress(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/0a6db289/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java index f9a5959..d4ff0bc 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java @@ -39,6 +39,8 @@ import org.apache.tez.runtime.library.api.KeyValuesReader; @Public public class ConcatenatedMergedKeyValuesInput extends MergedLogicalInput { + private ConcatenatedMergedKeyValuesReader concatenatedMergedKeyValuesReader; + public ConcatenatedMergedKeyValuesInput(MergedInputContext context, List<Input> inputs) { super(context, inputs); @@ -82,7 +84,10 @@ public class ConcatenatedMergedKeyValuesInput extends MergedLogicalInput { public Iterable<Object> getCurrentValues() throws IOException { return currentReader.getCurrentValues(); } - + + public float getProgress() { + return (1.0f)*(currentReaderIndex + 1)/getInputs().size(); + } } /** @@ -91,11 +96,17 @@ public class ConcatenatedMergedKeyValuesInput extends MergedLogicalInput { */ @Override public KeyValuesReader getReader() throws Exception { - return new ConcatenatedMergedKeyValuesReader(); + concatenatedMergedKeyValuesReader = new ConcatenatedMergedKeyValuesReader(); + return concatenatedMergedKeyValuesReader; } @Override public void setConstituentInputIsReady(Input input) { informInputReady(); } + + @Override + public float getProgress() throws IOException, InterruptedException { + return concatenatedMergedKeyValuesReader.getProgress(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/0a6db289/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java index 40db1c4..18c765a 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java @@ -83,6 +83,7 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput { private TezCounter inputKeyCounter; private TezCounter inputValueCounter; + private TezCounter shuffledInputs; private final AtomicBoolean isStarted = new AtomicBoolean(false); @@ -111,6 +112,8 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput { this.inputKeyCounter = getContext().getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS); this.inputValueCounter = getContext().getCounters().findCounter( TaskCounter.REDUCE_INPUT_RECORDS); + this.shuffledInputs = getContext().getCounters().findCounter( + TaskCounter.NUM_SHUFFLED_INPUTS); this.conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, getContext().getWorkDirs()); return Collections.emptyList(); @@ -255,6 +258,20 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput { } @Override + public float getProgress() throws IOException, InterruptedException { + int totalInputs = getNumPhysicalInputs(); + if (totalInputs != 0) { + synchronized (this) { + return ((0.5f) * this.shuffledInputs.getValue() / totalInputs) + + ((rawIter != null) ? + ((0.5f) * rawIter.getProgress().getProgress()) : 0.0f); + } + } else { + return 0.0f; + } + } + + @Override public void handleEvents(List<Event> inputEvents) throws IOException { Shuffle shuffleLocalRef; synchronized (this) { http://git-wip-us.apache.org/repos/asf/tez/blob/0a6db289/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java index 2345bbb..5d6668d 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java @@ -250,5 +250,11 @@ public class OrderedGroupedMergedKVInput extends MergedLogicalInput { } } } - + public float getProgress() throws IOException, InterruptedException { + float totalProgress = 0.0f; + for(Input input : getInputs()) { + totalProgress += ((OrderedGroupedKVInput)input).getProgress(); + } + return (1.0f) * totalProgress/getInputs().size(); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/0a6db289/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java index 2cb317a..d893910 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java @@ -287,4 +287,8 @@ public class UnorderedKVInput extends AbstractLogicalInput { return Collections.unmodifiableSet(confKeys); } + @Override + public float getProgress() { + return kvReader.getProgress(); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/0a6db289/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java index 725f785..66c3625 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java @@ -17,17 +17,23 @@ */ package org.apache.tez.runtime.library.processor; +import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.tez.runtime.api.AbstractLogicalInput; import org.apache.tez.runtime.api.AbstractLogicalIOProcessor; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.api.LogicalOutput; import org.apache.tez.runtime.api.Processor; import org.apache.tez.runtime.api.ProcessorContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Implements an {@link AbstractLogicalIOProcessor} and provides empty @@ -38,9 +44,37 @@ import org.apache.tez.runtime.api.ProcessorContext; @Public @Evolving public abstract class SimpleProcessor extends AbstractLogicalIOProcessor { + private static final Logger LOG = + LoggerFactory.getLogger(AbstractLogicalIOProcessor.class); protected Map<String, LogicalInput> inputs; protected Map<String, LogicalOutput> outputs; + Timer progressTimer = new Timer(); + TimerTask progressTask = new TimerTask() { + + @Override + public void run() { + try { + float progSum = 0.0f; + if (getInputs() != null) { + for(LogicalInput input : getInputs().values()) { + if (input instanceof AbstractLogicalInput) { + progSum += ((AbstractLogicalInput) input).getProgress(); + } + } + float progress = (1.0f) * progSum / inputs.size(); + getContext().setProgress(progress); + } + } catch (IOException e) { + LOG.warn("Encountered IOException during Processor progress update" + + e.getMessage()); + } catch (InterruptedException e) { + LOG.warn("Encountered InterruptedException during Processor progress" + + "update" + e.getMessage()); + } + } + }; + public SimpleProcessor(ProcessorContext context) { super(context); } @@ -72,6 +106,7 @@ public abstract class SimpleProcessor extends AbstractLogicalIOProcessor { for (LogicalInput input : getInputs().values()) { input.start(); } + progressTimer.schedule(progressTask, 0, 100); } if (getOutputs() != null) { for (LogicalOutput output : getOutputs().values()) { @@ -101,7 +136,7 @@ public abstract class SimpleProcessor extends AbstractLogicalIOProcessor { @Override public void close() throws Exception { - + progressTimer.cancel(); } public Map<String, LogicalInput> getInputs() { @@ -112,4 +147,7 @@ public abstract class SimpleProcessor extends AbstractLogicalIOProcessor { return outputs; } + public Timer getProgressTimer() { + return progressTimer; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/0a6db289/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java index 1122bbe..c58e338 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java @@ -18,17 +18,21 @@ package org.apache.tez.runtime.library.processor; +import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.util.List; import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; import com.google.common.base.Charsets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.runtime.api.AbstractLogicalInput; import org.apache.tez.runtime.api.AbstractLogicalIOProcessor; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.LogicalInput; @@ -47,6 +51,34 @@ public class SleepProcessor extends AbstractLogicalIOProcessor { private static final Logger LOG = LoggerFactory.getLogger(SleepProcessor.class); private int timeToSleepMS; + protected Map<String, LogicalInput> inputs; + protected Map<String, LogicalOutput> outputs; + + Timer progressTimer = new Timer(); + TimerTask progressTask = new TimerTask() { + + @Override + public void run() { + try { + float progSum = 0.0f; + if (inputs != null) { + for(LogicalInput input : inputs.values()) { + if (input instanceof AbstractLogicalInput) { + progSum += ((AbstractLogicalInput) input).getProgress(); + } + } + float progress = (1.0f) * progSum / inputs.size(); + getContext().setProgress(progress); + } + } catch (IOException e) { + LOG.warn("Encountered IOException during Processor progress update" + + e.getMessage()); + } catch (InterruptedException e) { + LOG.warn("Encountered InterruptedException during Processor progress" + + "update" + e.getMessage()); + } + } + }; public SleepProcessor(ProcessorContext context) { super(context); @@ -69,14 +101,17 @@ public class SleepProcessor extends AbstractLogicalIOProcessor { } @Override - public void run(Map<String, LogicalInput> inputs, - Map<String, LogicalOutput> outputs) throws Exception { + public void run(Map<String, LogicalInput> _inputs, + Map<String, LogicalOutput> _outputs) throws Exception { + inputs = _inputs; + outputs = _outputs; LOG.info("Running the Sleep Processor, sleeping for " + timeToSleepMS + " ms"); - for (LogicalInput input : inputs.values()) { + for (LogicalInput input : _inputs.values()) { input.start(); } - for (LogicalOutput output : outputs.values()) { + progressTimer.schedule(progressTask, 0, 100); + for (LogicalOutput output : _outputs.values()) { output.start(); } try { @@ -93,7 +128,7 @@ public class SleepProcessor extends AbstractLogicalIOProcessor { @Override public void close() throws Exception { - // Nothing to cleanup + progressTimer.cancel(); } /** http://git-wip-us.apache.org/repos/asf/tez/blob/0a6db289/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java index 884808e..bd8490b 100644 --- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java +++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java @@ -18,8 +18,11 @@ package org.apache.tez.mapreduce.examples.processor; +import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,6 +35,7 @@ import org.apache.tez.mapreduce.examples.FilterLinesByWord.TextLongPair; import org.apache.tez.mapreduce.hadoop.MRJobConfig; import org.apache.tez.mapreduce.input.MRInput; import org.apache.tez.mapreduce.input.MRInputLegacy; +import org.apache.tez.runtime.api.AbstractLogicalInput; import org.apache.tez.runtime.api.AbstractLogicalIOProcessor; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.LogicalInput; @@ -46,6 +50,33 @@ public class FilterByWordInputProcessor extends AbstractLogicalIOProcessor { private static final Logger LOG = LoggerFactory.getLogger(FilterByWordInputProcessor.class); private String filterWord; + protected Map<String, LogicalInput> inputs; + protected Map<String, LogicalOutput> outputs; + Timer progressTimer = new Timer(); + TimerTask progressTask = new TimerTask() { + + @Override + public void run() { + try { + float progSum = 0.0f; + if (inputs != null) { + for(LogicalInput input : inputs.values()) { + if (input instanceof AbstractLogicalInput) { + progSum += ((AbstractLogicalInput) input).getProgress(); + } + } + float progress = (1.0f) * progSum / inputs.size(); + getContext().setProgress(progress); + } + } catch (IOException e) { + LOG.warn("Encountered IOException during Processor progress update" + + e.getMessage()); + } catch (InterruptedException e) { + LOG.warn("Encountered InterruptedException during Processor progress" + + "update" + e.getMessage()); + } + } + }; public FilterByWordInputProcessor(ProcessorContext context) { super(context); @@ -69,38 +100,40 @@ public class FilterByWordInputProcessor extends AbstractLogicalIOProcessor { @Override public void close() throws Exception { - LOG.info("Broadcast Processor closing. Nothing to do"); + progressTimer.cancel(); } @Override - public void run(Map<String, LogicalInput> inputs, - Map<String, LogicalOutput> outputs) throws Exception { + public void run(Map<String, LogicalInput> _inputs, + Map<String, LogicalOutput> _outputs) throws Exception { + this.inputs = _inputs; + this.outputs = _outputs; - if (inputs.size() != 1) { + if (_inputs.size() != 1) { throw new IllegalStateException("FilterByWordInputProcessor processor can only work with a single input"); } - if (outputs.size() != 1) { + if (_outputs.size() != 1) { throw new IllegalStateException("FilterByWordInputProcessor processor can only work with a single output"); } - for (LogicalInput input : inputs.values()) { + for (LogicalInput input : _inputs.values()) { input.start(); } - for (LogicalOutput output : outputs.values()) { + for (LogicalOutput output : _outputs.values()) { output.start(); } - LogicalInput li = inputs.values().iterator().next(); + LogicalInput li = _inputs.values().iterator().next(); if (! (li instanceof MRInput)) { throw new IllegalStateException("FilterByWordInputProcessor processor can only work with MRInput"); } - LogicalOutput lo = outputs.values().iterator().next(); + LogicalOutput lo = _outputs.values().iterator().next(); if (! (lo instanceof UnorderedKVOutput)) { throw new IllegalStateException("FilterByWordInputProcessor processor can only work with OnFileUnorderedKVOutput"); } - + progressTimer.schedule(progressTask, 0, 100); MRInputLegacy mrInput = (MRInputLegacy) li; mrInput.init(); UnorderedKVOutput kvOutput = (UnorderedKVOutput) lo; http://git-wip-us.apache.org/repos/asf/tez/blob/0a6db289/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordOutputProcessor.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordOutputProcessor.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordOutputProcessor.java index 15c17fc..5872527 100644 --- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordOutputProcessor.java +++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordOutputProcessor.java @@ -51,6 +51,7 @@ public class FilterByWordOutputProcessor extends SimpleMRProcessor { @Override public void close() throws Exception { LOG.info("Broadcast Output Processor closing. Nothing to do"); + getProgressTimer().cancel(); } @Override
