TEZ-2918. Make progress notifications in IOs (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5ec498d8 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5ec498d8 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5ec498d8 Branch: refs/heads/master Commit: 5ec498d8f5b6cecf4eaee8f995ea2cf9ca2acfcc Parents: 344a8cc Author: Bikas Saha <[email protected]> Authored: Fri Nov 13 05:54:09 2015 -0800 Committer: Bikas Saha <[email protected]> Committed: Fri Nov 13 05:54:09 2015 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../tez/runtime/api/MergedInputContext.java | 5 + .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 14 +- .../org/apache/tez/mapreduce/input/MRInput.java | 10 +- .../tez/mapreduce/input/MultiMRInput.java | 4 +- .../org/apache/tez/mapreduce/lib/MRReader.java | 16 ++ .../tez/mapreduce/lib/MRReaderMapReduce.java | 10 +- .../tez/mapreduce/lib/MRReaderMapred.java | 11 +- .../apache/tez/mapreduce/output/MROutput.java | 1 + .../apache/tez/mapreduce/input/TestMRInput.java | 3 + .../tez/mapreduce/input/TestMultiMRInput.java | 4 + .../tez/mapreduce/lib/TestKVReadersWithMR.java | 12 +- .../tez/mapreduce/output/TestMROutput.java | 161 +++++++++++++++++++ .../runtime/LogicalIOProcessorRuntimeTask.java | 2 +- .../org/apache/tez/runtime/RuntimeTask.java | 9 +- .../api/impl/TezMergedInputContextImpl.java | 11 +- .../runtime/api/impl/TezTaskContextImpl.java | 2 +- .../tez/runtime/TestInputReadyTracker.java | 6 +- .../common/readers/UnorderedKVReader.java | 8 +- .../library/common/shuffle/ShuffleUtils.java | 1 + .../common/shuffle/impl/ShuffleManager.java | 2 + .../shuffle/orderedgrouped/MergeManager.java | 35 ++-- .../common/shuffle/orderedgrouped/Shuffle.java | 4 +- .../orderedgrouped/ShuffleScheduler.java | 3 +- .../common/sort/impl/ExternalSorter.java | 11 +- .../common/sort/impl/PipelinedSorter.java | 10 +- .../common/sort/impl/dflt/DefaultSorter.java | 10 +- .../writers/UnorderedPartitionedKVWriter.java | 9 +- .../input/ConcatenatedMergedKeyValueInput.java | 2 + .../input/ConcatenatedMergedKeyValuesInput.java | 2 + .../library/input/OrderedGroupedKVInput.java | 8 +- .../input/OrderedGroupedMergedKVInput.java | 8 +- .../runtime/library/input/UnorderedKVInput.java | 3 +- .../common/readers/TestUnorderedKVReader.java | 5 +- .../orderedgrouped/TestMergeManager.java | 3 + .../orderedgrouped/TestShuffleScheduler.java | 2 + .../common/sort/impl/TestPipelinedSorter.java | 38 ++--- .../sort/impl/dflt/TestDefaultSorter.java | 2 + .../TestUnorderedPartitionedKVWriter.java | 4 +- .../input/TestSortedGroupedMergedInput.java | 26 +-- 40 files changed, 383 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 3ce1640..52c73da 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES TEZ-2679. Admin forms of launch env settings ALL CHANGES: + TEZ-2918. Make progress notifications in IOs TEZ-2940. Invalid shuffle max slow start setting causes vertex to hang indefinitely TEZ-2930. Tez UI: Parent controller is not polling at times TEZ-1670. Add tests for all converter functions in HistoryEventTimelineConversion. http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-api/src/main/java/org/apache/tez/runtime/api/MergedInputContext.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedInputContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedInputContext.java index 41c519b..65bb087 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedInputContext.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedInputContext.java @@ -42,6 +42,11 @@ public interface MergedInputContext { public void inputIsReady(); /** + * Inform the framework that progress has been made + */ + public void notifyProgress(); + + /** * Get the work directories for the Input * @return an array of work dirs */ http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index 27eb69b..bfd1634 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -193,6 +193,7 @@ public class TaskAttemptImpl implements TaskAttempt, org.apache.tez.runtime.api.impl.TaskStatistics statistics; long lastNotifyProgressTimestamp = 0; + private final long hungIntervalMax; // Used to store locality information when Set<String> taskHosts = new HashSet<String>(); @@ -488,6 +489,10 @@ public class TaskAttemptImpl implements TaskAttempt, this.taskResource = resource; this.containerContext = containerContext; this.leafVertex = leafVertex; + this.hungIntervalMax = conf.getLong( + TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS, + TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS_DEFAULT); + } @Override @@ -1378,14 +1383,11 @@ public class TaskAttemptImpl implements TaskAttempt, ta.lastNotifyProgressTimestamp = ta.clock.getTime(); } else { long currTime = ta.clock.getTime(); - long hungIntervalMax = ta.conf.getLong( - TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS, - TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS_DEFAULT); - if (hungIntervalMax > 0 && - currTime - ta.lastNotifyProgressTimestamp > hungIntervalMax) { + if (ta.hungIntervalMax > 0 && + currTime - ta.lastNotifyProgressTimestamp > ta.hungIntervalMax) { // task is hung String diagnostics = "Attempt failed because it appears to make no progress for " + - hungIntervalMax + "ms"; + ta.hungIntervalMax + "ms"; LOG.info(diagnostics + " " + ta.getID()); // send event that will fail this attempt ta.sendEvent( http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java index 93161cb..b68d135 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java @@ -461,9 +461,10 @@ public class MRInput extends MRInputBase { mrReader = new MRReaderMapReduce(jobConf, getContext().getCounters(), inputRecordCounter, getContext().getApplicationId().getClusterTimestamp(), getContext() .getTaskVertexIndex(), getContext().getApplicationId().getId(), getContext() - .getTaskIndex(), getContext().getTaskAttemptNumber()); + .getTaskIndex(), getContext().getTaskAttemptNumber(), getContext()); } else { - mrReader = new MRReaderMapred(jobConf, getContext().getCounters(), inputRecordCounter); + mrReader = new MRReaderMapred(jobConf, getContext().getCounters(), inputRecordCounter, + getContext()); } } else { TaskSplitMetaInfo[] allMetaInfo = MRInputUtils.readSplits(jobConf); @@ -477,14 +478,14 @@ public class MRInput extends MRInputBase { mrReader = new MRReaderMapReduce(jobConf, newInputSplit, getContext().getCounters(), inputRecordCounter, getContext().getApplicationId().getClusterTimestamp(), getContext().getTaskVertexIndex(), getContext().getApplicationId().getId(), - getContext().getTaskIndex(), getContext().getTaskAttemptNumber()); + getContext().getTaskIndex(), getContext().getTaskAttemptNumber(), getContext()); } else { org.apache.hadoop.mapred.InputSplit oldInputSplit = MRInputUtils .getOldSplitDetailsFromDisk(splitMetaInfo, jobConf, getContext().getCounters() .findCounter(TaskCounter.SPLIT_RAW_BYTES)); mrReader = new MRReaderMapred(jobConf, oldInputSplit, getContext().getCounters(), - inputRecordCounter); + inputRecordCounter, getContext()); } } } finally { @@ -508,6 +509,7 @@ public class MRInput extends MRInputBase { return new KeyValueReader() { @Override public boolean next() throws IOException { + getContext().notifyProgress(); return false; } http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java index 4a792dc..2b60f29 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java @@ -177,7 +177,7 @@ public class MultiMRInput extends MRInputBase { getContext().getCounters(), inputRecordCounter, getContext().getApplicationId() .getClusterTimestamp(), getContext().getTaskVertexIndex(), getContext() .getApplicationId().getId(), getContext().getTaskIndex(), getContext() - .getTaskAttemptNumber()); + .getTaskAttemptNumber(), getContext()); if (LOG.isDebugEnabled()) { LOG.debug(getContext().getSourceVertexName() + " split Details -> SplitClass: " + split.getClass().getName() + ", NewSplit: " + split); @@ -186,7 +186,7 @@ public class MultiMRInput extends MRInputBase { } else { split = MRInputUtils.getOldSplitDetailsFromEvent(splitProto, localJobConf); reader = new MRReaderMapred(localJobConf, (org.apache.hadoop.mapred.InputSplit) split, - getContext().getCounters(), inputRecordCounter); + getContext().getCounters(), inputRecordCounter, getContext()); if (LOG.isDebugEnabled()) { LOG.debug(getContext().getSourceVertexName() + " split Details -> SplitClass: " + split.getClass().getName() + ", OldSplit: " + split); http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReader.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReader.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReader.java index 8a20827..aa35fec 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReader.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReader.java @@ -21,14 +21,30 @@ package org.apache.tez.mapreduce.lib; import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.library.api.KeyValueReader; @Private public abstract class MRReader extends KeyValueReader { + + private final InputContext context; + public abstract void setSplit(Object split) throws IOException; public abstract boolean isSetup(); public abstract float getProgress() throws IOException, InterruptedException; public abstract void close() throws IOException; public abstract Object getSplit(); public abstract Object getRecordReader(); + + public MRReader(InputContext context) { + this.context = context; + } + + protected final void notifyProgress() { + context.notifyProgress(); + } + + protected final void notifyDone() { + context.notifyProgress(); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java index 5fc3e49..10b871e 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java @@ -20,6 +20,7 @@ package org.apache.tez.mapreduce.lib; import java.io.IOException; +import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.library.api.IOInterruptedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,15 +52,16 @@ public class MRReaderMapReduce extends MRReader { private boolean setupComplete = false; public MRReaderMapReduce(JobConf jobConf, TezCounters tezCounters, TezCounter inputRecordCounter, - long clusterId, int vertexIndex, int appId, int taskIndex, int taskAttemptNumber) + long clusterId, int vertexIndex, int appId, int taskIndex, int taskAttemptNumber, InputContext context) throws IOException { this(jobConf, null, tezCounters, inputRecordCounter, clusterId, vertexIndex, appId, taskIndex, - taskAttemptNumber); + taskAttemptNumber, context); } public MRReaderMapReduce(JobConf jobConf, InputSplit inputSplit, TezCounters tezCounters, TezCounter inputRecordCounter, long clusterId, int vertexIndex, int appId, int taskIndex, - int taskAttemptNumber) throws IOException { + int taskAttemptNumber, InputContext context) throws IOException { + super(context); this.inputRecordCounter = inputRecordCounter; this.taskAttemptContext = new TaskAttemptContextImpl(jobConf, tezCounters, clusterId, vertexIndex, appId, taskIndex, taskAttemptNumber, true, null); @@ -121,9 +123,11 @@ public class MRReaderMapReduce extends MRReader { } if (hasNext) { inputRecordCounter.increment(1); + notifyProgress(); } else { hasCompletedProcessing(); completedProcessing = true; + notifyDone(); } return hasNext; } http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java index 1bf71f6..d81debb 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java @@ -33,6 +33,7 @@ import org.apache.tez.common.counters.TezCounter; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.mapreduce.hadoop.mapred.MRReporter; import org.apache.tez.mapreduce.input.MRInput; +import org.apache.tez.runtime.api.InputContext; import com.google.common.base.Preconditions; @@ -56,13 +57,15 @@ public class MRReaderMapred extends MRReader { private boolean setupComplete = false; - public MRReaderMapred(JobConf jobConf, TezCounters tezCounters, TezCounter inputRecordCounter) + public MRReaderMapred(JobConf jobConf, TezCounters tezCounters, TezCounter inputRecordCounter, + InputContext context) throws IOException { - this(jobConf, null, tezCounters, inputRecordCounter); + this(jobConf, null, tezCounters, inputRecordCounter, context); } public MRReaderMapred(JobConf jobConf, InputSplit inputSplit, TezCounters tezCounters, - TezCounter inputRecordCounter) throws IOException { + TezCounter inputRecordCounter, InputContext context) throws IOException { + super(context); this.jobConf = jobConf; this.tezCounters = tezCounters; this.inputRecordCounter = inputRecordCounter; @@ -113,9 +116,11 @@ public class MRReaderMapred extends MRReader { boolean hasNext = recordReader.next(key, value); if (hasNext) { inputRecordCounter.increment(1); + notifyProgress(); } else { hasCompletedProcessing(); completedProcessing = true; + notifyDone(); } // The underlying reader does not throw InterruptedExceptions. Cannot convert to an // IOInterruptedException without checking the interrupt flag on each request, which is also http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java index 12e5092..ec83bf5 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java @@ -510,6 +510,7 @@ public class MROutput extends AbstractLogicalOutput { oldRecordWriter.write(key, value); } outputRecordCounter.increment(1); + getContext().notifyProgress(); } }; } http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java index 50114b9..448b90c 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMRInput.java @@ -19,6 +19,8 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import java.io.IOException; import java.util.LinkedList; @@ -57,6 +59,7 @@ public class TestMRInput { mrInput.start(); assertFalse(mrInput.getReader().next()); + verify(inputContext, times(1)).notifyProgress(); List<Event> events = new LinkedList<>(); try { http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java index 80e3e77..db5643e 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java @@ -24,6 +24,8 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import java.io.IOException; import java.util.ArrayList; @@ -155,9 +157,11 @@ public class TestMultiMRInput { input.handleEvents(eventList); int readerCount = 0; + int recordCount = 0; for (KeyValueReader reader : input.getKeyValueReaders()) { readerCount++; while (reader.next()) { + verify(inputContext, times(++recordCount) ).notifyProgress(); if (data1.size() == 0) { fail("Found more records than expected"); } http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/lib/TestKVReadersWithMR.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/lib/TestKVReadersWithMR.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/lib/TestKVReadersWithMR.java index 65f5ad0..dad18de 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/lib/TestKVReadersWithMR.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/lib/TestKVReadersWithMR.java @@ -25,6 +25,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.common.counters.TezCounter; import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.runtime.api.InputContext; import org.junit.Before; import org.junit.Test; @@ -32,6 +33,9 @@ import java.io.IOException; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; public class TestKVReadersWithMR { @@ -60,12 +64,14 @@ public class TestKVReadersWithMR { } public void testWithSpecificNumberOfKV(int kvPairs) throws IOException { - MRReaderMapred reader = new MRReaderMapred(conf, counters, inputRecordCounter); + InputContext mockContext = mock(InputContext.class); + MRReaderMapred reader = new MRReaderMapred(conf, counters, inputRecordCounter, mockContext); reader.recordReader = new DummyRecordReader(kvPairs); int records = 0; while (reader.next()) { records++; + verify(mockContext, times(records)).notifyProgress(); } assertTrue(kvPairs == records); @@ -80,13 +86,15 @@ public class TestKVReadersWithMR { } public void testWithSpecificNumberOfKV_MapReduce(int kvPairs) throws IOException { + InputContext mockContext = mock(InputContext.class); MRReaderMapReduce reader = new MRReaderMapReduce(conf, counters, inputRecordCounter, -1, 1, - 10, 20, 30); + 10, 20, 30, mockContext); reader.recordReader = new DummyRecordReaderMapReduce(kvPairs); int records = 0; while (reader.next()) { records++; + verify(mockContext, times(records)).notifyProgress(); } assertTrue(kvPairs == records); http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java index b898fe0..0129a8b 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java @@ -20,21 +20,52 @@ package org.apache.tez.mapreduce.output; import static org.junit.Assert.*; import static org.mockito.Mockito.*; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.Writer; +import java.util.HashMap; +import java.util.List; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.DataSinkDescriptor; +import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.mapreduce.TestUmbilical; +import org.apache.tez.mapreduce.TezTestUtils; import org.apache.tez.mapreduce.hadoop.MRConfig; +import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask; import org.apache.tez.runtime.api.OutputContext; +import org.apache.tez.runtime.api.ProcessorContext; +import org.apache.tez.runtime.api.impl.ExecutionContextImpl; +import org.apache.tez.runtime.api.impl.InputSpec; +import org.apache.tez.runtime.api.impl.OutputSpec; +import org.apache.tez.runtime.api.impl.TaskSpec; +import org.apache.tez.runtime.api.impl.TezUmbilical; +import org.apache.tez.runtime.library.api.KeyValueWriter; +import org.apache.tez.runtime.library.processor.SimpleProcessor; +import org.junit.Ignore; import org.junit.Test; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Lists; + public class TestMROutput { @@ -144,4 +175,134 @@ public class TestMROutput { when(outputContext.getCounters()).thenReturn(new TezCounters()); return outputContext; } + + public static LogicalIOProcessorRuntimeTask createLogicalTask( + Configuration conf, + TezUmbilical umbilical, String dagName, + String vertexName) throws Exception { + ProcessorDescriptor procDesc = ProcessorDescriptor.create(TestProcessor.class.getName()); + List<InputSpec> inputSpecs = Lists.newLinkedList(); + List<OutputSpec> outputSpecs = Lists.newLinkedList(); + outputSpecs.add(new OutputSpec("Null", + MROutput.createConfigBuilder(conf, TestOutputFormat.class).build().getOutputDescriptor(), 1)); + + TaskSpec taskSpec = new TaskSpec( + TezTestUtils.getMockTaskAttemptId(0, 0, 0, 0), + dagName, vertexName, -1, + procDesc, + inputSpecs, + outputSpecs, null); + + FileSystem fs = FileSystem.getLocal(conf); + Path workDir = + new Path(new Path(System.getProperty("test.build.data", "/tmp")), + "TestMapOutput").makeQualified(fs.getUri(), fs.getWorkingDirectory()); + + LogicalIOProcessorRuntimeTask task = new LogicalIOProcessorRuntimeTask( + taskSpec, + 0, + conf, + new String[] {workDir.toString()}, + umbilical, + null, + new HashMap<String, String>(), + HashMultimap.<String, String>create(), null, "", new ExecutionContextImpl("localhost"), + Runtime.getRuntime().maxMemory(), true); + return task; + } + + public static class TestOutputCommitter extends OutputCommitter { + + @Override + public void setupJob(JobContext jobContext) throws IOException { + } + + @Override + public void setupTask(TaskAttemptContext taskContext) throws IOException { + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException { + return false; + } + + @Override + public void commitTask(TaskAttemptContext taskContext) throws IOException { + } + + @Override + public void abortTask(TaskAttemptContext taskContext) throws IOException { + } + + } + + public static class TestOutputFormat extends OutputFormat<String, String> { + public static class TestRecordWriter extends RecordWriter<String, String> { + Writer writer; + boolean doWrite; + TestRecordWriter(boolean write) throws IOException { + this.doWrite = write; + if (doWrite) { + File f = File.createTempFile("test", null); + f.deleteOnExit(); + writer = new BufferedWriter(new FileWriter(f)); + } + } + + @Override + public void write(String key, String value) throws IOException, InterruptedException { + if (doWrite) { + writer.write(key); + writer.write(value); + } + } + + @Override + public void close(TaskAttemptContext context) throws IOException, InterruptedException { + writer.close(); + } + + } + + @Override + public RecordWriter<String, String> getRecordWriter(TaskAttemptContext context) + throws IOException, InterruptedException { + return new TestRecordWriter(true); + } + + @Override + public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) + throws IOException, InterruptedException { + return new TestOutputCommitter(); + } + } + + public static class TestProcessor extends SimpleProcessor { + public TestProcessor(ProcessorContext context) { + super(context); + } + + @Override + public void run() throws Exception { + KeyValueWriter writer = (KeyValueWriter) getOutputs().values().iterator().next().getWriter(); + for (int i=0; i<1000000; ++i) { + writer.write("key", "value"); + } + } + + } + + @Ignore + @Test + public void testPerf() throws Exception { + LogicalIOProcessorRuntimeTask task = createLogicalTask(new Configuration(), + new TestUmbilical(), "dag", "vertex"); + task.initialize(); + task.run(); + task.close(); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java index 4b00c97..6b9b016 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java @@ -535,7 +535,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { } MergedInputContext mergedInputContext = new TezMergedInputContextImpl(groupInputSpec.getMergedInputDescriptor().getUserPayload(), - groupInputSpec.getGroupName(), groupInputsMap, inputReadyTracker, localDirs); + groupInputSpec.getGroupName(), groupInputsMap, inputReadyTracker, localDirs, this); List<Input> inputs = Lists.newArrayListWithCapacity(groupInputSpec.getGroupVertices().size()); for (String groupVertex : groupInputSpec.getGroupVertices()) { inputs.add(inputsMap.get(groupVertex)); http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java index 23e57b1..529dde0 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java @@ -54,7 +54,7 @@ public abstract class RuntimeTask { private final AtomicBoolean taskDone; private final TaskCounterUpdater counterUpdater; private final TaskStatistics statistics; - private volatile boolean progressNotified; + private final AtomicBoolean progressNotified = new AtomicBoolean(false); protected RuntimeTask(TaskSpec taskSpec, Configuration tezConf, TezUmbilical tezUmbilical, String pid, boolean setupSysCounterUpdater) { @@ -105,13 +105,12 @@ public abstract class RuntimeTask { this.fatalErrorMessage = message; } - public void notifyProgressInvocation() { - progressNotified = true; + public final void notifyProgressInvocation() { + progressNotified.lazySet(true); } public boolean getAndClearProgressNotification() { - boolean retVal = progressNotified; - progressNotified = false; + boolean retVal = progressNotified.getAndSet(false); return retVal; } http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java index 74592c6..e35e332 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java @@ -27,6 +27,7 @@ import java.util.Map; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.runtime.InputReadyTracker; +import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask; import org.apache.tez.runtime.api.MergedLogicalInput; import org.apache.tez.runtime.api.MergedInputContext; @@ -38,10 +39,12 @@ public class TezMergedInputContextImpl implements MergedInputContext { private final Map<String, MergedLogicalInput> groupInputsMap; private final InputReadyTracker inputReadyTracker; private final String[] workDirs; + private final LogicalIOProcessorRuntimeTask runtimeTask; public TezMergedInputContextImpl(@Nullable UserPayload userPayload, String groupInputName, Map<String, MergedLogicalInput> groupInputsMap, - InputReadyTracker inputReadyTracker, String[] workDirs) { + InputReadyTracker inputReadyTracker, String[] workDirs, + LogicalIOProcessorRuntimeTask runtimeTask) { checkNotNull(groupInputName, "groupInputName is null"); checkNotNull(groupInputsMap, "input-group map is null"); checkNotNull(inputReadyTracker, "inputReadyTracker is null"); @@ -50,6 +53,7 @@ public class TezMergedInputContextImpl implements MergedInputContext { this.userPayload = userPayload; this.inputReadyTracker = inputReadyTracker; this.workDirs = workDirs; + this.runtimeTask = runtimeTask; } @Override @@ -67,4 +71,9 @@ public class TezMergedInputContextImpl implements MergedInputContext { return Arrays.copyOf(workDirs, workDirs.length); } + @Override + public final void notifyProgress() { + runtimeTask.notifyProgressInvocation(); + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java index 211f9d7..c12b334 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java @@ -174,7 +174,7 @@ public abstract class TezTaskContextImpl implements TaskContext, Closeable { } @Override - public void notifyProgress() { + public final void notifyProgress() { runtimeTask.notifyProgressInvocation(); } http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java index a77e38f..29c5023 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java @@ -156,8 +156,10 @@ public class TestInputReadyTracker { group2Inputs.add(input4); Map<String, MergedLogicalInput> mergedInputMap = new HashMap<String, MergedLogicalInput>(); - MergedInputContext mergedInputContext1 = new TezMergedInputContextImpl(null, "group1", mergedInputMap, inputReadyTracker, null); - MergedInputContext mergedInputContext2 = new TezMergedInputContextImpl(null, "group2", mergedInputMap, inputReadyTracker, null); + MergedInputContext mergedInputContext1 = new TezMergedInputContextImpl( + null, "group1", mergedInputMap, inputReadyTracker, null, null); + MergedInputContext mergedInputContext2 = new TezMergedInputContextImpl( + null, "group2", mergedInputMap, inputReadyTracker, null, null); AnyOneMergedInputForTest group1 = new AnyOneMergedInputForTest(mergedInputContext1, group1Inputs); AllMergedInputForTest group2 = new AllMergedInputForTest(mergedInputContext2, group2Inputs); http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java index a8dd1b2..57bb121 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java @@ -20,6 +20,7 @@ package org.apache.tez.runtime.library.common.readers; import java.io.IOException; +import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.library.api.IOInterruptedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,6 +62,7 @@ public class UnorderedKVReader<K, V> extends KeyValueReader { private final int ifileBufferSize; private final TezCounter inputRecordCounter; + private final InputContext context; private K key; private V value; @@ -75,10 +77,10 @@ public class UnorderedKVReader<K, V> extends KeyValueReader { public UnorderedKVReader(ShuffleManager shuffleManager, Configuration conf, CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength, int ifileBufferSize, - TezCounter inputRecordCounter) + TezCounter inputRecordCounter, InputContext context) throws IOException { this.shuffleManager = shuffleManager; - + this.context = context; this.codec = codec; this.ifileReadAhead = ifileReadAhead; this.ifileReadAheadLength = ifileReadAheadLength; @@ -113,6 +115,7 @@ public class UnorderedKVReader<K, V> extends KeyValueReader { public boolean next() throws IOException { if (readNextFromCurrentReader()) { inputRecordCounter.increment(1); + context.notifyProgress(); numRecordsRead++; return true; } else { @@ -120,6 +123,7 @@ public class UnorderedKVReader<K, V> extends KeyValueReader { while (nextInputExists) { if(readNextFromCurrentReader()) { inputRecordCounter.increment(1); + context.notifyProgress(); numRecordsRead++; return true; } http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java index 818cfaa..431ba38 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java @@ -397,6 +397,7 @@ public class ShuffleUtils { @Nullable long[] partitionStats) throws IOException { Preconditions.checkArgument(eventList != null, "EventList can't be null"); + context.notifyProgress(); if (finalMergeEnabled) { Preconditions.checkArgument(isLastEvent, "Can not send multiple events when final merge is " + "enabled"); http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index a7c1c59..b3e050a 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -604,6 +604,7 @@ public class ShuffleManager implements FetcherCallback { lock.unlock(); } + inputContext.notifyProgress(); boolean committed = false; if (!completedInputSet.contains(inputIdentifier)) { synchronized (completedInputSet) { @@ -760,6 +761,7 @@ public class ShuffleManager implements FetcherCallback { + "InputIdentifier: " + srcAttemptIdentifier + ", connectFailed: " + connectFailed); failedShufflesCounter.increment(1); + inputContext.notifyProgress(); if (srcAttemptIdentifier == null) { reportFatalError(null, "Received fetchFailure for an unknown src (null)"); } else { http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java index fb9b243..61ff338 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java @@ -84,7 +84,12 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { private final LocalDirAllocator localDirAllocator; private final TezTaskOutputFiles mapOutputFile; - private final Progressable nullProgressable = new NullProgressable(); + private final Progressable progressable = new Progressable() { + @Override + public void progress() { + inputContext.notifyProgress(); + } + }; private final Combiner combiner; private final Set<MapOutput> inMemoryMergedMapOutputs = @@ -624,6 +629,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { return; } + inputContext.notifyProgress(); InputAttemptIdentifier dummyMapId = inputs.get(0).getAttemptIdentifier(); List<Segment> inMemorySegments = new ArrayList<Segment>(); @@ -650,8 +656,8 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { inMemorySegments, inMemorySegments.size(), new Path(inputContext.getUniqueIdentifier()), (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf), - nullProgressable, null, null, null, null); - TezMerger.writeFile(rIter, writer, nullProgressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT); + progressable, null, null, null, null); + TezMerger.writeFile(rIter, writer, progressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT); writer.close(); LOG.info(inputContext.getSourceVertexName() + @@ -697,6 +703,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { } numMemToDiskMerges.increment(1); + inputContext.notifyProgress(); //name this output file same as the name of the first file that is //there in the current list of inmem files (this is guaranteed to @@ -716,7 +723,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { // TODO Maybe track serialized vs deserialized bytes. - // All disk writes done by this merge are overhead - due to the lac of + // All disk writes done by this merge are overhead - due to the lack of // adequate memory to keep all segments in memory. outputPath = mapOutputFile.getInputFileForWrite( srcTaskIdentifier.getInputIdentifier().getInputIndex(), srcTaskIdentifier.getSpillEventId(), @@ -742,13 +749,13 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { (Class)ConfigUtils.getIntermediateInputValueClass(conf), inMemorySegments, inMemorySegments.size(), tmpDir, (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf), - nullProgressable, spilledRecordsCounter, null, additionalBytesRead, null); + progressable, spilledRecordsCounter, null, additionalBytesRead, null); // spilledRecordsCounter is tracking the number of keys that will be // read from each of the segments being merged - which is essentially // what will be written to disk. if (null == combiner) { - TezMerger.writeFile(rIter, writer, nullProgressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT); + TezMerger.writeFile(rIter, writer, progressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT); } else { // TODO Counters for Combine runCombineProcessor(rIter, writer); @@ -818,7 +825,8 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { return; } numDiskToDiskMerges.increment(1); - + inputContext.notifyProgress(); + long approxOutputSize = 0; int bytesPerSum = conf.getInt("io.bytes.per.checksum", 512); @@ -879,13 +887,13 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { inputSegments, ioSortFactor, tmpDir, (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf), - nullProgressable, true, spilledRecordsCounter, null, + progressable, true, spilledRecordsCounter, null, mergedMapOutputsCounter, null); // TODO Maybe differentiate between data written because of Merges and // the finalMerge (i.e. final mem available may be different from // initial merge mem) - TezMerger.writeFile(iter, writer, nullProgressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT); + TezMerger.writeFile(iter, writer, progressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT); writer.close(); additionalBytesWritten.increment(writer.getCompressedLength()); } catch (IOException e) { @@ -1010,6 +1018,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { } } + inputContext.notifyProgress(); // merge config params Class keyClass = (Class)ConfigUtils.getIntermediateInputKeyClass(job); @@ -1046,12 +1055,12 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { mapOutputFile.getInputFileForWrite(srcTaskId, Integer.MAX_VALUE, inMemToDiskBytes).suffix(Constants.MERGED_OUTPUT_PREFIX); final TezRawKeyValueIterator rIter = TezMerger.merge(job, fs, keyClass, valueClass, - memDiskSegments, numMemDiskSegments, tmpDir, comparator, nullProgressable, + memDiskSegments, numMemDiskSegments, tmpDir, comparator, progressable, spilledRecordsCounter, null, additionalBytesRead, null); final Writer writer = new Writer(job, fs, outputPath, keyClass, valueClass, codec, null, null); try { - TezMerger.writeFile(rIter, writer, nullProgressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT); + TezMerger.writeFile(rIter, writer, progressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT); } catch (IOException e) { if (null != outputPath) { try { @@ -1127,7 +1136,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { TezRawKeyValueIterator diskMerge = TezMerger.merge( job, fs, keyClass, valueClass, codec, diskSegments, ioSortFactor, numInMemSegments, tmpDir, comparator, - nullProgressable, false, spilledRecordsCounter, null, additionalBytesRead, null); + progressable, false, spilledRecordsCounter, null, additionalBytesRead, null); diskSegments.clear(); if (0 == finalSegments.size()) { return diskMerge; @@ -1138,7 +1147,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { // This is doing nothing but creating an iterator over the segments. return TezMerger.merge(job, fs, keyClass, valueClass, finalSegments, finalSegments.size(), tmpDir, - comparator, nullProgressable, spilledRecordsCounter, null, + comparator, progressable, spilledRecordsCounter, null, additionalBytesRead, null); } } http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java index b5dcd4c..de3b2cb 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java @@ -309,6 +309,7 @@ public class Shuffle implements ExceptionReporter { // Finish the on-going merges... TezRawKeyValueIterator kvIter = null; + inputContext.notifyProgress(); try { kvIter = merger.close(); } catch (Throwable e) { @@ -317,7 +318,8 @@ public class Shuffle implements ExceptionReporter { throw new ShuffleError("Error while doing final merge ", e); } mergePhaseTime.setValue(System.currentTimeMillis() - startTime); - + + inputContext.notifyProgress(); // Sanity check synchronized (Shuffle.this) { if (throwable.get() != null) { http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index 22da46c..dcfb274 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -466,6 +466,7 @@ class ShuffleScheduler { boolean isLocalFetch ) throws IOException { + inputContext.notifyProgress(); if (!isInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex())) { if (!isLocalFetch) { /** @@ -626,7 +627,7 @@ class ShuffleScheduler { boolean connectError, boolean isLocalFetch) { failedShuffleCounter.increment(1); - + inputContext.notifyProgress(); int failures = incrementAndGetFailureAttempt(srcAttempt); if (!isLocalFetch) { http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java index aa521ea..7a2dc68 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java @@ -59,7 +59,6 @@ import org.apache.tez.runtime.library.common.combine.Combiner; import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader; import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer; import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput; -import org.apache.tez.runtime.library.hadoop.compat.NullProgressable; import com.google.common.base.Preconditions; @@ -72,6 +71,7 @@ public abstract class ExternalSorter { spillFileIndexPaths.clear(); spillFilePaths.clear(); reportStatistics(); + outputContext.notifyProgress(); } public abstract void flush() throws IOException; @@ -86,7 +86,13 @@ public abstract class ExternalSorter { } } - protected final Progressable nullProgressable = new NullProgressable(); + protected final Progressable progressable = new Progressable() { + @Override + public void progress() { + outputContext.notifyProgress(); + } + }; + protected final OutputContext outputContext; protected final Combiner combiner; protected final Partitioner partitioner; @@ -298,6 +304,7 @@ public abstract class ExternalSorter { protected void runCombineProcessor(TezRawKeyValueIterator kvIter, Writer writer) throws IOException { try { + outputContext.notifyProgress(); combiner.combine(kvIter, writer); } catch (InterruptedException e) { Thread.currentThread().interrupt(); http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java index 2d53a2e..33a65d2 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java @@ -429,6 +429,7 @@ public class PipelinedSorter extends ExternalSorter { span.kvmeta.put(valstart); span.kvmeta.put(valend - valstart); mapOutputRecordCounter.increment(1); + outputContext.notifyProgress(); mapOutputByteCounter.increment(valend - keystart); } @@ -545,6 +546,7 @@ public class PipelinedSorter extends ExternalSorter { if (isThreadInterrupted()) { return false; } + outputContext.notifyProgress(); TezRawKeyValueIterator kvIter = merger.filter(i); //write merged output to disk long segmentStart = out.getPos(); @@ -611,6 +613,7 @@ public class PipelinedSorter extends ExternalSorter { public void flush() throws IOException { final String uniqueIdentifier = outputContext.getUniqueIdentifier(); + outputContext.notifyProgress(); /** * Possible that the thread got interrupted when flush was happening or when the flush was * never invoked. As a part of cleanup activity in TezTaskRunner, it would invoke close() @@ -698,6 +701,7 @@ public class PipelinedSorter extends ExternalSorter { } numShuffleChunks.setValue(numSpills); fileOutputByteCounter.increment(rfs.getFileStatus(finalOutputFile).getLen()); + // ??? why are events not being sent here? return; } @@ -742,7 +746,7 @@ public class PipelinedSorter extends ExternalSorter { segmentList, mergeFactor, new Path(uniqueIdentifier), (RawComparator) ConfigUtils.getIntermediateOutputKeyComparator(conf), - nullProgressable, sortSegments, true, + progressable, sortSegments, true, null, spilledRecordsCounter, additionalSpillBytesRead, null); // Not using any Progress in TezMerger. Should just work. //write merged output to disk @@ -751,7 +755,7 @@ public class PipelinedSorter extends ExternalSorter { new Writer(conf, finalOut, keyClass, valClass, codec, spilledRecordsCounter, null, merger.needsRLE()); if (combiner == null || numSpills < minSpillsForCombine) { - TezMerger.writeFile(kvIter, writer, nullProgressable, + TezMerger.writeFile(kvIter, writer, progressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT); } else { runCombineProcessor(kvIter, writer); @@ -893,7 +897,7 @@ public class PipelinedSorter extends ExternalSorter { public SpanIterator sort(IndexedSorter sorter) { long start = System.currentTimeMillis(); if(length() > 1) { - sorter.sort(this, 0, length(), nullProgressable); + sorter.sort(this, 0, length(), progressable); } LOG.info(outputContext.getDestinationVertexName() + ": " + "done sorting span=" + index + ", length=" + length() + ", " + "time=" + (System.currentTimeMillis() - start)); http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java index a833228..67da617 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java @@ -347,6 +347,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab int valend = bb.markRecord(); mapOutputRecordCounter.increment(1); + outputContext.notifyProgress(); mapOutputByteCounter.increment( distanceTo(keystart, valend, bufvoid)); @@ -662,6 +663,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab @Override public void flush() throws IOException { LOG.info(outputContext.getDestinationVertexName() + ": " + "Starting flush of map output"); + outputContext.notifyProgress(); if (Thread.currentThread().isInterrupted()) { /** * Possible that the thread got interrupted when flush was happening or when the flush was @@ -710,6 +712,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab sameKeyCount = sameKey; totalKeysCount = totalKeys; } + outputContext.notifyProgress(); sortAndSpill(sameKeyCount, totalKeysCount); } } catch (InterruptedException e) { @@ -835,7 +838,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab throws IOException, InterruptedException { final int mstart = getMetaStart(); final int mend = getMetaEnd(); - sorter.sort(this, mstart, mend, nullProgressable); + sorter.sort(this, mstart, mend, progressable); spill(mstart, mend, sameKeyCount, totalKeysCount); } @@ -1281,6 +1284,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab List<Segment> segmentList = new ArrayList<Segment>(numSpills); for(int i = 0; i < numSpills; i++) { + outputContext.notifyProgress(); TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts); Segment s = @@ -1309,7 +1313,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab segmentList, mergeFactor, new Path(taskIdentifier), (RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(conf), - nullProgressable, sortSegments, true, + progressable, sortSegments, true, null, spilledRecordsCounter, additionalSpillBytesRead, null); // Not using any Progress in TezMerger. Should just work. @@ -1320,7 +1324,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab spilledRecordsCounter, null); if (combiner == null || numSpills < minSpillsForCombine) { TezMerger.writeFile(kvIter, writer, - nullProgressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT); + progressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT); } else { runCombineProcessor(kvIter, writer); } http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java index 70b345f..ce410be 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java @@ -248,8 +248,9 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit throw new IOException("Exception during spill", new IOException(spillException)); } if (skipBuffers) { - //special case, where we have only one partition and pipeliing is disabled. - writer.append(key, value); + //special case, where we have only one partition and pipelining is disabled. + writer.append(key, value); // ???? Why is outputrecordscounter not updated here? + outputContext.notifyProgress(); } else { int partition = partitioner.getPartition(key, value, numPartitions); write(key, value, partition); @@ -319,6 +320,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit outputRecordBytesCounter.increment(currentBuffer.nextPosition - (metaStart + META_SIZE)); outputBytesWithOverheadCounter.increment((currentBuffer.nextPosition - metaStart) + metaSkip); outputRecordsCounter.increment(1); + outputContext.notifyProgress(); currentBuffer.partitionPositions[partition] = metaStart; currentBuffer.recordsPerPartition[partition]++; currentBuffer.numRecords++; @@ -407,6 +409,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit long compressedLength = 0; for (int i = 0; i < numPartitions; i++) { IFile.Writer writer = null; + outputContext.notifyProgress(); try { long segmentStart = out.getPos(); if (wrappedBuffer.partitionPositions[i] == WrappedBuffer.PARTITION_ABSENT_POSITION) { @@ -556,6 +559,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit boolean isLastSpill, String pathComponent, BitSet emptyPartitions) throws IOException { + outputContext.notifyProgress(); DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto .newBuilder(); @@ -710,6 +714,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit } synchronized (spillInfoList) { for (SpillInfo spillInfo : spillInfoList) { + outputContext.notifyProgress(); TezIndexRecord indexRecord = spillInfo.spillRecord.getIndex(i); if (indexRecord.getPartLength() == 0) { // Skip empty partitions within a spill http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java index 45784d9..0b8ed21 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java @@ -53,6 +53,7 @@ public class ConcatenatedMergedKeyValueInput extends MergedLogicalInput { if (currentReaderIndex == getInputs().size()) { hasCompletedProcessing(); completedProcessing = true; + getContext().notifyProgress(); return false; } try { @@ -63,6 +64,7 @@ public class ConcatenatedMergedKeyValueInput extends MergedLogicalInput { } currentReader = (KeyValueReader) reader; currentReaderIndex++; + getContext().notifyProgress(); } catch (Exception e) { // An InterruptedException is not expected here since this works off of // underlying readers which take care of throwing IOInterruptedExceptions http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java index 27ff324..4a8969e 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java @@ -54,6 +54,7 @@ public class ConcatenatedMergedKeyValuesInput extends MergedLogicalInput { if (currentReaderIndex == getInputs().size()) { hasCompletedProcessing(); completedProcessing = true; + getContext().notifyProgress(); return false; } try { @@ -64,6 +65,7 @@ public class ConcatenatedMergedKeyValuesInput extends MergedLogicalInput { } currentReader = (KeyValuesReader) reader; currentReaderIndex++; + getContext().notifyProgress(); } catch (Exception e) { // An InterruptedException is not expected here since this works off of // underlying readers which take care of throwing IOInterruptedExceptions http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java index 39cc471..5e367cf 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java @@ -229,6 +229,7 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput { return new KeyValuesReader() { @Override public boolean next() throws IOException { + getContext().notifyProgress(); hasCompletedProcessing(); completedProcessing = true; return false; @@ -259,7 +260,7 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput { synchronized(this) { valuesIter = vIter; } - return new OrderedGroupedKeyValuesReader(valuesIter); + return new OrderedGroupedKeyValuesReader(valuesIter, getContext()); } @Override @@ -307,13 +308,16 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput { private static class OrderedGroupedKeyValuesReader extends KeyValuesReader { private final ValuesIterator valuesIter; + private final InputContext context; - OrderedGroupedKeyValuesReader(ValuesIterator valuesIter) { + OrderedGroupedKeyValuesReader(ValuesIterator valuesIter, InputContext context) { this.valuesIter = valuesIter; + this.context = context; } @Override public boolean next() throws IOException { + context.notifyProgress(); return valuesIter.moveToNext(); } http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java index 41ca7c9..2345bbb 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java @@ -60,7 +60,7 @@ public class OrderedGroupedMergedKVInput extends MergedLogicalInput { */ @Override public KeyValuesReader getReader() throws Exception { - return new OrderedGroupedMergedKeyValuesReader(getInputs()); + return new OrderedGroupedMergedKeyValuesReader(getInputs(), getContext()); } @Override @@ -81,8 +81,10 @@ public class OrderedGroupedMergedKVInput extends MergedLogicalInput { private final ValuesIterable currentValues; private KeyValuesReader nextKVReader; private Object currentKey; + private final MergedInputContext context; - public OrderedGroupedMergedKeyValuesReader(List<Input> inputs) throws Exception { + public OrderedGroupedMergedKeyValuesReader(List<Input> inputs, MergedInputContext context) + throws Exception { keyComparator = ((OrderedGroupedKVInput) inputs.get(0)) .getInputKeyComparator(); pQueue = new PriorityQueue<KeyValuesReader>(inputs.size(), @@ -95,6 +97,7 @@ public class OrderedGroupedMergedKVInput extends MergedLogicalInput { } } currentValues = new ValuesIterable(); + this.context = context; } private void advanceAndAddToQueue(KeyValuesReader kvsReadr) @@ -122,6 +125,7 @@ public class OrderedGroupedMergedKVInput extends MergedLogicalInput { finishedReaders.clear(); nextKVReader = pQueue.poll(); + context.notifyProgress(); if (nextKVReader != null) { currentKey = nextKVReader.getCurrentKey(); currentValues.moveToNext(); http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java index fad164f..dbbe23f 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java @@ -169,6 +169,7 @@ public class UnorderedKVInput extends AbstractLogicalInput { return new KeyValueReader() { @Override public boolean next() throws IOException { + getContext().notifyProgress(); hasCompletedProcessing(); completedProcessing = true; return false; @@ -241,7 +242,7 @@ public class UnorderedKVInput extends AbstractLogicalInput { int ifileBufferSize, boolean ifileReadAheadEnabled, int ifileReadAheadLength) throws IOException { return new UnorderedKVReader(shuffleManager, conf, codec, ifileReadAheadEnabled, - ifileReadAheadLength, ifileBufferSize, inputRecordCounter); + ifileReadAheadLength, ifileBufferSize, inputRecordCounter, getContext()); } private static final Set<String> confKeys = new HashSet<String>(); http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java index 80bdc42..c49a423 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java @@ -26,6 +26,7 @@ import org.apache.hadoop.io.Text; import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.common.counters.TezCounter; import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.library.api.IOInterruptedException; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; @@ -125,7 +126,7 @@ public class TestUnorderedKVReader { }).when(manager).getNextInput(); unorderedKVReader = new UnorderedKVReader<Text, Text>(manager, - defaultConf, null, false, -1, -1, inputRecords); + defaultConf, null, false, -1, -1, inputRecords, mock(InputContext.class)); } private void createIFile(Path path, int recordCount) throws IOException { @@ -177,7 +178,7 @@ public class TestUnorderedKVReader { TezCounter inputRecords = counters.findCounter(TaskCounter.INPUT_RECORDS_PROCESSED); UnorderedKVReader<Text, Text> reader = new UnorderedKVReader<Text, Text>(shuffleManager, defaultConf, null, false, -1, -1, - inputRecords); + inputRecords, mock(InputContext.class)); try { reader.next(); http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java index 214ec45..b8f99de 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -328,6 +329,8 @@ public class TestMergeManager { assertEquals(m1Prefix, m2Prefix); assertNotEquals(m1Prefix, m3Prefix); assertNotEquals(m2Prefix, m3Prefix); + + verify(inputContext, atLeastOnce()).notifyProgress(); } http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java index 3fe540c..1a6c3be 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java @@ -152,6 +152,8 @@ public class TestShuffleScheduler { scheduler.copySucceeded(identifiers[i], mapHosts[i], 20, 25, 100, mapOutput, false); scheduler.freeHost(mapHosts[i]); } + + verify(inputContext, atLeast(numInputs)).notifyProgress(); // Ensure the executor exits, and without an error. executorFuture.get(); http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java index 2cebea4..70819e5 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.tez.runtime.library.common.sort.impl; import com.google.common.collect.Maps; @@ -39,30 +57,13 @@ import java.util.TreeMap; import java.util.UUID; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import static org.mockito.Matchers.anyListOf; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.internal.verification.VerificationModeFactory.times; -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ public class TestPipelinedSorter { private static FileSystem localFs = null; private static Path workDir = null; @@ -417,6 +418,7 @@ public class TestPipelinedSorter { //Verify dataset verifyData(reader); reader.close(); + verify(outputContext, atLeastOnce()).notifyProgress(); } private void verifyCounters(PipelinedSorter sorter, OutputContext context) { http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java index b531464..e0374a3 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -429,6 +430,7 @@ public class TestDefaultSorter { TezCounter outputBytesWithOverheadCounter = context.getCounters().findCounter (TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD); assertTrue(outputBytesWithOverheadCounter.getValue() > 0); + verify(context, atLeastOnce()).notifyProgress(); } private void writeData(ExternalSorter sorter, int numKeys, int keyLen) throws IOException { http://git-wip-us.apache.org/repos/asf/tez/blob/5ec498d8/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java index 1a10eb8..e7a2125 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java @@ -25,7 +25,6 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyListOf; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.atLeast; -import static org.mockito.Mockito.atMost; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -585,6 +584,8 @@ public class TestUnorderedPartitionedKVWriter { assertTrue(eventProto.hasPathComponent()); } + verify(outputContext, atLeast(1)).notifyProgress(); + // Verify if all spill files are available. TezTaskOutput taskOutput = new TezTaskOutputFiles(conf, uniqueId); @@ -784,6 +785,7 @@ public class TestUnorderedPartitionedKVWriter { expectedValues.remove(i); } assertEquals(0, expectedValues.size()); + verify(outputContext, atLeast(1)).notifyProgress(); } private static String createRandomString(int size) {
