TEZ_2844. Backport TEZ-2775 to branch-0.7. Improve and consolidate logging in Runtime components. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/cd80d9ab Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/cd80d9ab Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/cd80d9ab Branch: refs/heads/branch-0.7 Commit: cd80d9ab592f8c116031087da01ac3f6a84a86d2 Parents: c7a946c Author: Siddharth Seth <[email protected]> Authored: Tue Sep 22 11:04:25 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Tue Sep 22 11:04:25 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../resources/tez-container-log4j.properties | 2 +- .../hadoop/mapred/split/TezGroupedSplit.java | 12 +++ .../hadoop/mapreduce/split/TezGroupedSplit.java | 12 +++ .../org/apache/tez/mapreduce/input/MRInput.java | 36 ++++--- .../tez/mapreduce/input/MRInputLegacy.java | 8 +- .../tez/mapreduce/input/MultiMRInput.java | 22 ++-- .../apache/tez/mapreduce/output/MROutput.java | 13 ++- .../runtime/LogicalIOProcessorRuntimeTask.java | 39 ++++++- .../apache/tez/runtime/task/TaskReporter.java | 2 +- .../library/common/InputAttemptIdentifier.java | 2 +- .../runtime/library/common/TezRuntimeUtils.java | 8 +- .../runtime/library/common/ValuesIterator.java | 2 - .../runtime/library/common/shuffle/Fetcher.java | 103 +++++++++++++------ .../library/common/shuffle/HttpConnection.java | 4 +- .../common/shuffle/ShuffleEventHandler.java | 1 + .../library/common/shuffle/ShuffleUtils.java | 40 +++++-- .../impl/ShuffleInputEventHandlerImpl.java | 41 ++++++-- .../common/shuffle/impl/ShuffleManager.java | 82 +++++++++------ .../impl/SimpleFetchedInputAllocator.java | 26 +++-- .../orderedgrouped/FetcherOrderedGrouped.java | 86 +++++++++++----- .../shuffle/orderedgrouped/MergeManager.java | 52 +++++----- .../common/shuffle/orderedgrouped/Shuffle.java | 36 ++++--- .../ShuffleInputEventHandlerOrderedGrouped.java | 42 ++++++-- .../orderedgrouped/ShuffleScheduler.java | 56 ++++++---- .../common/sort/impl/ExternalSorter.java | 26 +++-- .../common/sort/impl/PipelinedSorter.java | 101 +++++++++++------- .../common/sort/impl/dflt/DefaultSorter.java | 69 +++++++------ .../writers/UnorderedPartitionedKVWriter.java | 49 +++++---- .../library/input/OrderedGroupedKVInput.java | 20 ++-- .../runtime/library/input/UnorderedKVInput.java | 16 ++- .../output/OrderedPartitionedKVOutput.java | 6 +- .../library/output/UnorderedKVOutput.java | 6 +- .../output/UnorderedPartitionedKVOutput.java | 4 +- .../impl/TestSimpleFetchedInputAllocator.java | 2 +- ...tShuffleInputEventHandlerOrderedGrouped.java | 3 +- .../sort/impl/dflt/TestDefaultSorter.java | 10 +- 37 files changed, 697 insertions(+), 343 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 2b6204c..9fc3555 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Release 0.7.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES + TEZ-2844. Backport TEZ-2775 to branch-0.7. Improve and consolidate logging in Runtime components. TEZ-2843. Tez UI: Show error if in progress fails due to AM not reachable TEZ-2842. Tez UI: Update Tez App details page while in-progress TEZ-2834. Make Tez preemption resilient to incorrect free resource reported http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-dag/src/main/resources/tez-container-log4j.properties ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/resources/tez-container-log4j.properties b/tez-dag/src/main/resources/tez-container-log4j.properties index c53994e..4620a78 100644 --- a/tez-dag/src/main/resources/tez-container-log4j.properties +++ b/tez-dag/src/main/resources/tez-container-log4j.properties @@ -28,7 +28,7 @@ log4j.appender.CLA=org.apache.tez.common.TezContainerLogAppender log4j.appender.CLA.containerLogDir=${yarn.app.container.log.dir} log4j.appender.CLA.layout=org.apache.log4j.PatternLayout -log4j.appender.CLA.layout.ConversionPattern=%d{ISO8601} [%p] [%t]|| %c{2} %m%n: +log4j.appender.CLA.layout.ConversionPattern=%d{ISO8601} [%p] [%t] |%c{2}|: %m%n # # Event Counter Appender http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java index 4f3a0f2..a9893aa 100644 --- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java +++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java @@ -22,6 +22,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -175,4 +176,15 @@ public class TezGroupedSplit implements InputSplit, Configurable { public String getRack() { return rack; } + + @Override + public String toString() { + return "TezGroupedSplit{" + + "wrappedSplits=" + wrappedSplits + + ", wrappedInputFormatName='" + wrappedInputFormatName + '\'' + + ", locations=" + Arrays.toString(locations) + + ", rack='" + rack + '\'' + + ", length=" + length + + '}'; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java index f85bbcd..430d2ec 100644 --- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java +++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java @@ -22,6 +22,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -185,4 +186,15 @@ public class TezGroupedSplit extends InputSplit public String getRack() { return rack; } + + @Override + public String toString() { + return "TezGroupedSplit{" + + "wrappedSplits=" + wrappedSplits + + ", wrappedInputFormatName='" + wrappedInputFormatName + '\'' + + ", locations=" + Arrays.toString(locations) + + ", rack='" + rack + '\'' + + ", length=" + length + + '}'; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java index 70365cd..93161cb 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java @@ -437,8 +437,9 @@ public class MRInput extends MRInputBase { getContext().inputIsReady(); this.splitInfoViaEvents = jobConf.getBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS, MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS_DEFAULT); - LOG.info("Using New mapreduce API: " + useNewApi - + ", split information via event: " + splitInfoViaEvents); + LOG.info(getContext().getSourceVertexName() + " using newmapreduce API=" + useNewApi + + ", split via event=" + splitInfoViaEvents + ", numPhysicalInputs=" + + getNumPhysicalInputs()); initializeInternal(); return null; } @@ -447,7 +448,6 @@ public class MRInput extends MRInputBase { public void start() { Preconditions.checkState(getNumPhysicalInputs() == 0 || getNumPhysicalInputs() == 1, "Expecting 0 or 1 physical input for MRInput"); - LOG.info("MRInput setup to received {} events", getNumPhysicalInputs()); } @Private @@ -490,7 +490,7 @@ public class MRInput extends MRInputBase { } finally { rrLock.unlock(); } - LOG.info("Initialzed MRInput: " + getContext().getSourceVertexName()); + LOG.info("Initialized MRInput: " + getContext().getSourceVertexName()); } /** @@ -588,7 +588,9 @@ public class MRInput extends MRInputBase { rrLock.lock(); try { initFromEventInternal(event); - LOG.info("Notifying on RecordReader Initialized"); + if (LOG.isDebugEnabled()) { + LOG.debug(getContext().getSourceVertexName() + " notifying on RecordReader initialized"); + } rrInited.signal(); } finally { rrLock.unlock(); @@ -599,7 +601,9 @@ public class MRInput extends MRInputBase { assert rrLock.getHoldCount() == 1; rrLock.lock(); try { - LOG.info("Awaiting RecordReader initialization"); + if (LOG.isDebugEnabled()) { + LOG.debug(getContext().getSourceVertexName() + " awaiting RecordReader initialization"); + } rrInited.await(); } catch (Exception e) { throw new IOException( @@ -621,22 +625,30 @@ public class MRInput extends MRInputBase { } private void initFromEventInternal(InputDataInformationEvent initEvent) throws IOException { - LOG.info("Initializing RecordReader from event"); + if (LOG.isDebugEnabled()) { + LOG.debug(getContext().getSourceVertexName() + " initializing RecordReader from event"); + } Preconditions.checkState(initEvent != null, "InitEvent must be specified"); MRSplitProto splitProto = MRSplitProto.parseFrom(ByteString.copyFrom(initEvent.getUserPayload())); Object split = null; if (useNewApi) { split = MRInputUtils.getNewSplitDetailsFromEvent(splitProto, jobConf); - LOG.info("Split Details -> SplitClass: " + split.getClass().getName() + ", NewSplit: " - + split); + if (LOG.isDebugEnabled()) { + LOG.debug(getContext().getSourceVertexName() + " split Details -> SplitClass: " + + split.getClass().getName() + ", NewSplit: " + + split); + } } else { split = MRInputUtils.getOldSplitDetailsFromEvent(splitProto, jobConf); - LOG.info("Split Details -> SplitClass: " + split.getClass().getName() + ", OldSplit: " - + split); + if (LOG.isDebugEnabled()) { + LOG.debug(getContext().getSourceVertexName() + " split Details -> SplitClass: " + + split.getClass().getName() + ", OldSplit: " + + split); + } } mrReader.setSplit(split); - LOG.info("Initialized RecordReader from event"); + LOG.info(getContext().getSourceVertexName() + " initialized RecordReader from event"); } private static class MRInputHelpersInternal extends MRInputHelpers { http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java index d825d53..e83c36a 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java @@ -73,7 +73,7 @@ public class MRInputLegacy extends MRInput { @Private protected void initializeInternal() throws IOException { - LOG.info("MRInputLegacy deferring initialization"); + LOG.info(getContext().getSourceVertexName() + " MRInputLegacy deferring initialization"); } @Private @@ -130,7 +130,11 @@ public class MRInputLegacy extends MRInput { } if (splitInfoViaEvents && !inited) { if (initEvent == null) { - LOG.info("Awaiting init event before initializing record reader"); + if (LOG.isDebugEnabled()) { + LOG.debug(getContext().getSourceVertexName() + + " awaiting init event before initializing record reader"); + } + try { eventCondition.await(); } catch (InterruptedException e) { http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java index 44d9c96..4a792dc 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java @@ -108,8 +108,8 @@ public class MultiMRInput extends MRInputBase { @Override public List<Event> initialize() throws IOException { super.initialize(); - LOG.info("Using New mapreduce API: " + useNewApi + ", numPhysicalInputs: " - + getNumPhysicalInputs()); + LOG.info(getContext().getSourceVertexName() + " using newmapreduce API=" + useNewApi + + ", numPhysicalInputs=" + getNumPhysicalInputs()); if (getNumPhysicalInputs() == 0) { getContext().inputIsReady(); } @@ -164,7 +164,9 @@ public class MultiMRInput extends MRInputBase { private MRReader initFromEvent(InputDataInformationEvent event) throws IOException { Preconditions.checkState(event != null, "Event must be specified"); - LOG.info("Initializing Reader: " + eventCount.get()); + if (LOG.isDebugEnabled()) { + LOG.debug(getContext().getSourceVertexName() + " initializing Reader: " + eventCount.get()); + } MRSplitProto splitProto = MRSplitProto.parseFrom(ByteString.copyFrom(event.getUserPayload())); Object split = null; MRReader reader = null; @@ -176,17 +178,21 @@ public class MultiMRInput extends MRInputBase { .getClusterTimestamp(), getContext().getTaskVertexIndex(), getContext() .getApplicationId().getId(), getContext().getTaskIndex(), getContext() .getTaskAttemptNumber()); - LOG.info("Split Details -> SplitClass: " + split.getClass().getName() + ", NewSplit: " - + split); + if (LOG.isDebugEnabled()) { + LOG.debug(getContext().getSourceVertexName() + " split Details -> SplitClass: " + + split.getClass().getName() + ", NewSplit: " + split); + } } else { split = MRInputUtils.getOldSplitDetailsFromEvent(splitProto, localJobConf); reader = new MRReaderMapred(localJobConf, (org.apache.hadoop.mapred.InputSplit) split, getContext().getCounters(), inputRecordCounter); - LOG.info("Split Details -> SplitClass: " + split.getClass().getName() + ", OldSplit: " - + split); + if (LOG.isDebugEnabled()) { + LOG.debug(getContext().getSourceVertexName() + " split Details -> SplitClass: " + + split.getClass().getName() + ", OldSplit: " + split); + } } - LOG.info("Initialized RecordReader from event"); + LOG.info(getContext().getSourceVertexName() + " initialized RecordReader from event"); return reader; } http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java index d19f707..7136482 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java @@ -343,7 +343,6 @@ public class MROutput extends AbstractLogicalOutput { @Override public List<Event> initialize() throws IOException, InterruptedException { - LOG.info("Initializing Simple Output"); getContext().requestInitialMemory(0l, null); //mandatory call taskNumberFormat.setMinimumIntegerDigits(5); taskNumberFormat.setGroupingUsed(false); @@ -380,6 +379,8 @@ public class MROutput extends AbstractLogicalOutput { } } + String outputFormatClassName; + outputRecordCounter = getContext().getCounters().findCounter(TaskCounter.OUTPUT_RECORDS); if (useNewApi) { @@ -388,6 +389,7 @@ public class MROutput extends AbstractLogicalOutput { newOutputFormat = org.apache.hadoop.util.ReflectionUtils.newInstance( newApiTaskAttemptContext.getOutputFormatClass(), jobConf); + outputFormatClassName = newOutputFormat.getClass().getName(); } catch (ClassNotFoundException cnfe) { throw new IOException(cnfe); } @@ -404,6 +406,7 @@ public class MROutput extends AbstractLogicalOutput { jobConf, taskAttemptId, new MRTaskReporter(getContext())); oldOutputFormat = jobConf.getOutputFormat(); + outputFormatClassName = oldOutputFormat.getClass().getName(); FileSystem fs = FileSystem.get(jobConf); String finalName = getOutputName(); @@ -414,8 +417,9 @@ public class MROutput extends AbstractLogicalOutput { } initCommitter(jobConf, useNewApi); - LOG.info("Initialized Simple Output" - + ", using_new_api: " + useNewApi); + LOG.info(getContext().getDestinationVertexName() + ": " + + "outputFormat=" + outputFormatClassName + + ", using newmapreduce API=" + useNewApi); return null; } @@ -517,6 +521,7 @@ public class MROutput extends AbstractLogicalOutput { @Override public synchronized List<Event> close() throws IOException { flush(); + LOG.info(getContext().getDestinationVertexName() + " closed"); long outputRecords = getContext().getCounters() .findCounter(TaskCounter.OUTPUT_RECORDS).getValue(); getContext().getStatisticsReporter().reportItemsProcessed(outputRecords); @@ -534,7 +539,6 @@ public class MROutput extends AbstractLogicalOutput { return; } - LOG.info("Flushing Simple Output"); if (useNewApi) { try { newRecordWriter.close(newApiTaskAttemptContext); @@ -544,7 +548,6 @@ public class MROutput extends AbstractLogicalOutput { } else { oldRecordWriter.close(null); } - LOG.info("Flushed Simple Output"); } /** http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java index 92035e1..c61bb4e 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java @@ -404,6 +404,17 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { @Override protected Void callInternal() throws Exception { + String oldThreadName = Thread.currentThread().getName(); + try { + Thread.currentThread().setName(oldThreadName + "{" + inputSpec.getSourceVertexName() + "}"); + return _callInternal(); + } finally { + Thread.currentThread().setName(oldThreadName); + } + } + + protected Void _callInternal() throws Exception { + if (LOG.isDebugEnabled()) { LOG.debug("Initializing Input using InputSpec: " + inputSpec); } @@ -437,6 +448,17 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { @Override protected Void callInternal() throws Exception { + String oldThreadName = Thread.currentThread().getName(); + try { + Thread.currentThread().setName(oldThreadName + " Start: {" + srcVertexName + "}"); + return _callInternal(); + } finally { + Thread.currentThread().setName(oldThreadName); + } + } + + protected Void _callInternal() throws Exception { + Thread.currentThread().setName("InitializerStart {" + srcVertexName + "}"); if (LOG.isDebugEnabled()) { LOG.debug("Starting Input with src edge: " + srcVertexName); } @@ -459,6 +481,17 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { @Override protected Void callInternal() throws Exception { + String oldThreadName = Thread.currentThread().getName(); + try { + Thread.currentThread().setName(oldThreadName + "{" + outputSpec.getDestinationVertexName() + "}"); + return _callInternal(); + } finally { + Thread.currentThread().setName(oldThreadName); + } + } + + protected Void _callInternal() throws Exception { + Thread.currentThread().setName("Initializer {" + outputSpec.getDestinationVertexName() + "}"); if (LOG.isDebugEnabled()) { LOG.debug("Initializing Output using OutputSpec: " + outputSpec); } @@ -727,8 +760,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { } }); - eventRouterThread.setName("TezTaskEventRouter[" - + taskSpec.getTaskAttemptID().toString() + "]"); + eventRouterThread.setName("TezTaskEventRouter{" + + taskSpec.getTaskAttemptID().toString() + "}"); eventRouterThread.start(); } @@ -756,7 +789,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { } public void cleanup() throws InterruptedException { - LOG.info("Final Counters : " + getCounters().toShortString()); + LOG.info("Final Counters for " + taskSpec.getTaskAttemptID() + ": " + getCounters().toShortString()); setTaskDone(); if (eventRouterThread != null) { eventRouterThread.interrupt(); http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java index d9a7786..bf93ce3 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java @@ -278,7 +278,7 @@ public class TaskReporter { int numEventsReceived = 0; if (task.isTaskDone() || task.hadFatalError()) { if (response.getEvents() != null && !response.getEvents().isEmpty()) { - LOG.info("Current task already complete, Ignoring all event in" + LOG.info("Current task already complete, Ignoring all events in" + " heartbeat response, eventCount=" + response.getEvents().size()); } } else { http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java index 59fb638..d70942c 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java @@ -146,6 +146,6 @@ public class InputAttemptIdentifier { public String toString() { return "InputAttemptIdentifier [inputIdentifier=" + inputIdentifier + ", attemptNumber=" + attemptNumber + ", pathComponent=" - + pathComponent + ", fetchTypeInfo=" + fetchTypeInfo + ", spillEventId=" + spillEventId +"]"; + + pathComponent + ", spillType=" + fetchTypeInfo.ordinal() + ", spillId=" + spillEventId +"]"; } } http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java index 69436ba..819423f 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java @@ -64,7 +64,9 @@ public class TezRuntimeUtils { if (className == null) { return null; } - LOG.info("Using Combiner class: " + className); + if (LOG.isDebugEnabled()) { + LOG.debug("Using Combiner class: " + className); + } try { clazz = (Class<? extends Combiner>) conf.getClassByName(className); } catch (ClassNotFoundException e) { @@ -105,7 +107,9 @@ public class TezRuntimeUtils { + conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS), e); } - LOG.info("Using partitioner class: " + clazz.getName()); + if (LOG.isDebugEnabled()) { + LOG.debug("Using partitioner class: " + clazz.getName()); + } Partitioner partitioner = null; http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java index 24f9f8a..f4da742 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java @@ -45,7 +45,6 @@ import com.google.common.base.Preconditions; @Private public class ValuesIterator<KEY,VALUE> { - private static final Logger LOG = LoggerFactory.getLogger(ValuesIterator.class.getName()); protected TezRawKeyValueIterator in; //input iterator private KEY key; // current key private KEY nextKey; @@ -82,7 +81,6 @@ public class ValuesIterator<KEY,VALUE> { this.keyDeserializer.open(keyIn); this.valDeserializer = serializationFactory.getDeserializer(valClass); this.valDeserializer.open(this.valueIn); - LOG.info("keyDeserializer=" + keyDeserializer + "; valueDeserializer=" + valDeserializer); } TezRawKeyValueIterator getRawIterator() { return in; } http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java index 0dee90c..be68cc1 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java @@ -202,8 +202,10 @@ public class Fetcher extends CallableWithNdc<FetchResult> { fetcherCallback.fetchFailed(host, left, hostFetchResult.connectFailed); } } else { - LOG.info("Ignoring failed fetch reports for " + hostFetchResult.failedInputs.length + - " inputs since the fetcher has already been stopped"); + if (isDebugEnabled) { + LOG.debug("Ignoring failed fetch reports for " + hostFetchResult.failedInputs.length + + " inputs since the fetcher has already been stopped"); + } } } @@ -409,9 +411,11 @@ public class Fetcher extends CallableWithNdc<FetchResult> { // indirectly penalizing the host InputAttemptIdentifier[] failedFetches = null; if (isShutDown.get()) { - LOG.info( - "Not reporting fetch failure during connection establishment, since an Exception was caught after shutdown." + - e.getClass().getName() + ", Message: " + e.getMessage()); + if (isDebugEnabled) { + LOG.debug( + "Not reporting fetch failure during connection establishment, since an Exception was caught after shutdown." + + e.getClass().getName() + ", Message: " + e.getMessage()); + } } else { failedFetches = remaining.toArray(new InputAttemptIdentifier[remaining.size()]); } @@ -420,7 +424,9 @@ public class Fetcher extends CallableWithNdc<FetchResult> { if (isShutDown.get()) { // shutdown would have no effect if in the process of establishing the connection. shutdownInternal(); - LOG.info("Detected fetcher has been shutdown after connection establishment. Returning"); + if (isDebugEnabled) { + LOG.debug("Detected fetcher has been shutdown after connection establishment. Returning"); + } return new HostFetchResult(new FetchResult(host, port, partition, remaining), null, false); } @@ -434,9 +440,11 @@ public class Fetcher extends CallableWithNdc<FetchResult> { // with the first map, typically lost map. So, penalize only that map // and add the rest if (isShutDown.get()) { - LOG.info( - "Not reporting fetch failure during connection establishment, since an Exception was caught after shutdown." + - e.getClass().getName() + ", Message: " + e.getMessage()); + if (isDebugEnabled) { + LOG.debug( + "Not reporting fetch failure during connection establishment, since an Exception was caught after shutdown." + + e.getClass().getName() + ", Message: " + e.getMessage()); + } } else { InputAttemptIdentifier firstAttempt = attempts.get(0); LOG.warn("Fetch Failure from host while connecting: " + host + ", attempt: " + firstAttempt @@ -462,7 +470,9 @@ public class Fetcher extends CallableWithNdc<FetchResult> { if (isShutDown.get()) { // shutdown would have no effect if in the process of establishing the connection. shutdownInternal(); - LOG.info("Detected fetcher has been shutdown after opening stream. Returning"); + if (isDebugEnabled) { + LOG.debug("Detected fetcher has been shutdown after opening stream. Returning"); + } return new HostFetchResult(new FetchResult(host, port, partition, remaining), null, false); } // After this point, closing the stream and connection, should cause a @@ -477,7 +487,9 @@ public class Fetcher extends CallableWithNdc<FetchResult> { while (!remaining.isEmpty() && failedInputs == null) { if (isShutDown.get()) { shutdownInternal(true); - LOG.info("Fetcher already shutdown. Aborting queued fetches for " + remaining.size() + " inputs"); + if (isDebugEnabled) { + LOG.debug("Fetcher already shutdown. Aborting queued fetches for " + remaining.size() + " inputs"); + } return new HostFetchResult(new FetchResult(host, port, partition, remaining), null, false); } @@ -487,7 +499,9 @@ public class Fetcher extends CallableWithNdc<FetchResult> { //clean up connection shutdownInternal(true); if (isShutDown.get()) { - LOG.info("Fetcher already shutdown. Aborting reconnection and queued fetches for " + remaining.size() + " inputs"); + if (isDebugEnabled) { + LOG.debug("Fetcher already shutdown. Aborting reconnection and queued fetches for " + remaining.size() + " inputs"); + } return new HostFetchResult(new FetchResult(host, port, partition, remaining), null, false); } @@ -501,8 +515,10 @@ public class Fetcher extends CallableWithNdc<FetchResult> { } if (isShutDown.get() && failedInputs != null && failedInputs.length > 0) { - LOG.info("Fetcher already shutdown. Not reporting fetch failures for: " + - failedInputs.length + " failed inputs"); + if (isDebugEnabled) { + LOG.debug("Fetcher already shutdown. Not reporting fetch failures for: " + + failedInputs.length + " failed inputs"); + } failedInputs = null; } return new HostFetchResult(new FetchResult(host, port, partition, remaining), failedInputs, @@ -520,7 +536,9 @@ public class Fetcher extends CallableWithNdc<FetchResult> { Iterator<InputAttemptIdentifier> iterator = remaining.iterator(); while (iterator.hasNext()) { if (isShutDown.get()) { - LOG.info("Already shutdown. Skipping fetch for " + remaining.size() + " inputs"); + if (isDebugEnabled) { + LOG.debug("Already shutdown. Skipping fetch for " + remaining.size() + " inputs"); + } break; } InputAttemptIdentifier srcAttemptId = iterator.next(); @@ -545,9 +563,11 @@ public class Fetcher extends CallableWithNdc<FetchResult> { @Override public void freeResources(FetchedInput fetchedInput) {} }); - LOG.info("fetcher" + " about to shuffle output of srcAttempt (direct disk)" + srcAttemptId - + " decomp: " + idxRecord.getRawLength() + " len: " + idxRecord.getPartLength() - + " to " + fetchedInput.getType()); + if (isDebugEnabled) { + LOG.debug("fetcher" + " about to shuffle output of srcAttempt (direct disk)" + srcAttemptId + + " decomp: " + idxRecord.getRawLength() + " len: " + idxRecord.getPartLength() + + " to " + fetchedInput.getType()); + } long endTime = System.currentTimeMillis(); fetcherCallback.fetchSucceeded(host, srcAttemptId, fetchedInput, idxRecord.getPartLength(), @@ -556,9 +576,12 @@ public class Fetcher extends CallableWithNdc<FetchResult> { } catch (IOException e) { cleanupFetchedInput(fetchedInput); if (isShutDown.get()) { - LOG.info( - "Already shutdown. Ignoring Local Fetch Failure for " + srcAttemptId + " from host " + - host + " : " + e.getClass().getName() + ", message=" + e.getMessage()); + if (isDebugEnabled) { + LOG.debug( + "Already shutdown. Ignoring Local Fetch Failure for " + srcAttemptId + + " from host " + + host + " : " + e.getClass().getName() + ", message=" + e.getMessage()); + } break; } LOG.warn("Failed to shuffle output of " + srcAttemptId + " from " + host + "(local fetch)", @@ -569,8 +592,10 @@ public class Fetcher extends CallableWithNdc<FetchResult> { InputAttemptIdentifier[] failedFetches = null; if (failMissing && remaining.size() > 0) { if (isShutDown.get()) { - LOG.info("Already shutdown, not reporting fetch failures for: " + remaining.size() + + if (isDebugEnabled) { + LOG.debug("Already shutdown, not reporting fetch failures for: " + remaining.size() + " remaining inputs"); + } } else { failedFetches = remaining.toArray(new InputAttemptIdentifier[remaining.size()]); } @@ -623,6 +648,9 @@ public class Fetcher extends CallableWithNdc<FetchResult> { public void shutdown() { if (!isShutDown.getAndSet(true)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Shutting down fetcher for host: " + host); + } shutdownInternal(); } } @@ -643,7 +671,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> { } catch (IOException e) { LOG.info("Exception while shutting down fetcher on " + logIdentifier + " : " + e.getMessage()); - if (LOG.isDebugEnabled()) { + if (isDebugEnabled) { LOG.debug(StringUtils.EMPTY, e); } } @@ -678,7 +706,9 @@ public class Fetcher extends CallableWithNdc<FetchResult> { // Don't know which one was bad, so consider all of them as bad return remaining.toArray(new InputAttemptIdentifier[remaining.size()]); } else { - LOG.info("Already shutdown. Ignoring badId error with message: " + e.getMessage()); + if (isDebugEnabled) { + LOG.debug("Already shutdown. Ignoring badId error with message: " + e.getMessage()); + } return null; } } @@ -694,12 +724,14 @@ public class Fetcher extends CallableWithNdc<FetchResult> { assert (srcAttemptId != null); return new InputAttemptIdentifier[]{srcAttemptId}; } else { - LOG.info("Already shutdown. Ignoring verification failure."); + if (isDebugEnabled) { + LOG.debug("Already shutdown. Ignoring verification failure."); + } return null; } } - if (LOG.isDebugEnabled()) { + if (isDebugEnabled) { LOG.debug("header: " + srcAttemptId + ", len: " + compressedLength + ", decomp len: " + decompressedLength); } @@ -723,10 +755,12 @@ public class Fetcher extends CallableWithNdc<FetchResult> { // } // Go! - LOG.info("fetcher" + " about to shuffle output of srcAttempt " - + fetchedInput.getInputAttemptIdentifier() + " decomp: " - + decompressedLength + " len: " + compressedLength + " to " - + fetchedInput.getType()); + if (isDebugEnabled) { + LOG.debug("fetcher" + " about to shuffle output of srcAttempt " + + fetchedInput.getInputAttemptIdentifier() + " decomp: " + + decompressedLength + " len: " + compressedLength + " to " + + fetchedInput.getType()); + } if (fetchedInput.getType() == Type.MEMORY) { ShuffleUtils.shuffleToMemory(((MemoryFetchedInput) fetchedInput).getBytes(), @@ -735,7 +769,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> { fetchedInput.getInputAttemptIdentifier().toString()); } else if (fetchedInput.getType() == Type.DISK) { ShuffleUtils.shuffleToDisk(((DiskFetchedInput) fetchedInput).getOutputStream(), - (host +":" +port), input, compressedLength, LOG, + (host +":" +port), input, compressedLength, decompressedLength, LOG, fetchedInput.getInputAttemptIdentifier().toString()); } else { throw new TezUncheckedException("Bad fetchedInput type while fetching shuffle data " + @@ -765,8 +799,11 @@ public class Fetcher extends CallableWithNdc<FetchResult> { } catch (IOException ioe) { if (isShutDown.get()) { cleanupFetchedInput(fetchedInput); - LOG.info("Already shutdown. Ignoring exception during fetch " + ioe.getClass().getName() + - ", Message: " + ioe.getMessage()); + if (isDebugEnabled) { + LOG.debug( + "Already shutdown. Ignoring exception during fetch " + ioe.getClass().getName() + + ", Message: " + ioe.getMessage()); + } return null; } if (shouldRetry(srcAttemptId, ioe)) { http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java index ad6ed19..17e6e90 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java @@ -274,7 +274,9 @@ public class HttpConnection { stopWatch.reset().start(); try { if (input != null) { - LOG.info("Closing input on " + logIdentifier); + if (LOG.isDebugEnabled()) { + LOG.debug("Closing input on " + logIdentifier); + } input.close(); } if (httpConnParams.keepAlive && connectionSucceeed) { http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleEventHandler.java index ff66158..da7c944 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleEventHandler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleEventHandler.java @@ -25,4 +25,5 @@ import org.apache.tez.runtime.api.Event; public interface ShuffleEventHandler { public void handleEvents(List<Event> events) throws IOException; + public void logProgress(boolean updateOnClose); } http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java index 46489ed..1873485 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java @@ -107,12 +107,16 @@ public class ShuffleUtils { Logger LOG, String identifier) throws IOException { try { IFile.Reader.readToMemory(shuffleData, input, compressedLength, codec, - ifileReadAhead, ifileReadAheadLength); + ifileReadAhead, ifileReadAheadLength); // metrics.inputBytes(shuffleData.length); - LOG.info("Read " + shuffleData.length + " bytes from input for " - + identifier); + if (LOG.isDebugEnabled()) { + LOG.debug("Read " + shuffleData.length + " bytes from input for " + + identifier); + } } catch (IOException ioe) { // Close the streams + LOG.info("Failed to read data to memory for " + identifier + ". len=" + compressedLength + + ", decomp=" + decompressedLength + ". ExceptionMessage=" + ioe.getMessage()); ioCleanup(input); // Re-throw throw ioe; @@ -120,7 +124,7 @@ public class ShuffleUtils { } public static void shuffleToDisk(OutputStream output, String hostIdentifier, - InputStream input, long compressedLength, Logger LOG, String identifier) + InputStream input, long compressedLength, long decompressedLength, Logger LOG, String identifier) throws IOException { // Copy data to local-disk long bytesLeft = compressedLength; @@ -138,12 +142,16 @@ public class ShuffleUtils { // metrics.inputBytes(n); } - LOG.info("Read " + (compressedLength - bytesLeft) - + " bytes from input for " + identifier); + if (LOG.isDebugEnabled()) { + LOG.debug("Read " + (compressedLength - bytesLeft) + + " bytes from input for " + identifier); + } output.close(); } catch (IOException ioe) { // Close the streams + LOG.info("Failed to read data to disk for " + identifier + ". len=" + compressedLength + + ", decomp=" + decompressedLength + ". ExceptionMessage=" + ioe.getMessage()); ioCleanup(input, output); // Re-throw throw ioe; @@ -468,10 +476,26 @@ public class ShuffleUtils { } log.info( "Completed fetch for attempt: " - + srcAttemptIdentifier + " to " + outputType + - ", CompressedSize=" + bytesCompressed + ", DecompressedSize=" + bytesDecompressed + + + toShortString(srcAttemptIdentifier) + +" to " + outputType + + ", csize=" + bytesCompressed + ", dsize=" + bytesDecompressed + ", EndTime=" + System.currentTimeMillis() + ", TimeTaken=" + millis + ", Rate=" + MBPS_FORMAT.get().format(rate) + " MB/s"); } + + private static String toShortString(InputAttemptIdentifier inputAttemptIdentifier) { + StringBuilder sb = new StringBuilder(); + sb.append("{"); + sb.append(inputAttemptIdentifier.getInputIdentifier().getInputIndex()); + sb.append(", ").append(inputAttemptIdentifier.getAttemptNumber()); + sb.append(", ").append(inputAttemptIdentifier.getPathComponent()); + if (inputAttemptIdentifier.getFetchTypeInfo() + != InputAttemptIdentifier.SPILL_INFO.FINAL_MERGE_ENABLED) { + sb.append(", ").append(inputAttemptIdentifier.getFetchTypeInfo().ordinal()); + sb.append(", ").append(inputAttemptIdentifier.getSpillEventId()); + } + sb.append("}"); + return sb.toString(); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java index 61b3e3a..8fb1568 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java @@ -22,6 +22,7 @@ package org.apache.tez.runtime.library.common.shuffle.impl; import java.io.IOException; import java.util.BitSet; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import com.google.protobuf.ByteString; @@ -32,20 +33,15 @@ import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.runtime.api.Event; -import org.apache.tez.runtime.api.Input; import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.api.events.InputFailedEvent; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; import org.apache.tez.runtime.library.common.InputIdentifier; -import org.apache.tez.runtime.library.common.shuffle.DiskFetchedInput; -import org.apache.tez.runtime.library.common.shuffle.FetchedInput; import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator; -import org.apache.tez.runtime.library.common.shuffle.MemoryFetchedInput; import org.apache.tez.runtime.library.common.shuffle.ShuffleEventHandler; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto; -import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataProto; import com.google.protobuf.InvalidProtocolBufferException; @@ -57,16 +53,24 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler { private static final Logger LOG = LoggerFactory.getLogger(ShuffleInputEventHandlerImpl.class); private final ShuffleManager shuffleManager; + //TODO: unused. Consider removing later? private final FetchedInputAllocator inputAllocator; private final CompressionCodec codec; private final boolean ifileReadAhead; private final int ifileReadAheadLength; private final boolean useSharedInputs; + private final InputContext inputContext; + + private final AtomicInteger nextToLogEventCount = new AtomicInteger(0); + private final AtomicInteger numDmeEvents = new AtomicInteger(0); + private final AtomicInteger numObsoletionEvents = new AtomicInteger(0); + private final AtomicInteger numDmeEventsNoData = new AtomicInteger(0); public ShuffleInputEventHandlerImpl(InputContext inputContext, ShuffleManager shuffleManager, FetchedInputAllocator inputAllocator, CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength) { + this.inputContext = inputContext; this.shuffleManager = shuffleManager; this.inputAllocator = inputAllocator; this.codec = codec; @@ -86,13 +90,29 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler { private void handleEvent(Event event) throws IOException { if (event instanceof DataMovementEvent) { + numDmeEvents.incrementAndGet(); processDataMovementEvent((DataMovementEvent)event); shuffleManager.updateEventReceivedTime(); } else if (event instanceof InputFailedEvent) { - processInputFailedEvent((InputFailedEvent)event); + numObsoletionEvents.incrementAndGet(); + processInputFailedEvent((InputFailedEvent) event); } else { throw new TezUncheckedException("Unexpected event type: " + event.getClass().getName()); } + if (numDmeEvents.get() + numObsoletionEvents.get() > nextToLogEventCount.get()) { + logProgress(false); + // Log every 50 events seen. + nextToLogEventCount.addAndGet(50); + } + } + + @Override + public void logProgress(boolean updateOnClose) { + LOG.info(inputContext.getSourceVertexName() + ": " + + "numDmeEventsSeen=" + numDmeEvents.get() + + ", numDmeEventsSeenWithNoData=" + numDmeEventsNoData.get() + + ", numObsoletionEventsSeen=" + numObsoletionEvents.get() + + (updateOnClose == true ? ", updateOnClose" : "")); } private void processDataMovementEvent(DataMovementEvent dme) throws IOException { @@ -104,9 +124,11 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler { throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e); } int srcIndex = dme.getSourceIndex(); - LOG.info("DME srcIdx: " + srcIndex + ", targetIndex: " + dme.getTargetIndex() - + ", attemptNum: " + dme.getVersion() + ", payload: " + ShuffleUtils - .stringify(shufflePayload)); + if (LOG.isDebugEnabled()) { + LOG.debug("DME srcIdx: " + srcIndex + ", targetIndex: " + dme.getTargetIndex() + + ", attemptNum: " + dme.getVersion() + ", payload: " + ShuffleUtils + .stringify(shufflePayload)); + } if (shufflePayload.hasEmptyPartitions()) { byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload @@ -119,6 +141,7 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler { LOG.debug("Source partition: " + srcIndex + " did not generate any data. SrcAttempt: [" + srcAttemptIdentifier + "]. Not fetching."); } + numDmeEventsNoData.incrementAndGet(); shuffleManager.addCompletedInputWithNoData(srcAttemptIdentifier); return; } http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index 2cfcc06..99fc18a 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -228,11 +228,11 @@ public class ShuffleManager implements FetcherCallback { ExecutorService fetcherRawExecutor = Executors.newFixedThreadPool( numFetchers, new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("Fetcher [" + srcNameTrimmed + "] #%d").build()); + .setNameFormat("Fetcher {" + srcNameTrimmed + "} #%d").build()); this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor); ExecutorService schedulerRawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() - .setDaemon(true).setNameFormat("ShuffleRunner [" + srcNameTrimmed + "]").build()); + .setDaemon(true).setNameFormat("ShuffleRunner {" + srcNameTrimmed + "}").build()); this.schedulerExecutor = MoreExecutors.listeningDecorator(schedulerRawExecutor); this.schedulerCallable = new RunShuffleCallable(conf); @@ -270,7 +270,7 @@ public class ShuffleManager implements FetcherCallback { shuffleInfoEventsMap = new ConcurrentHashMap<InputIdentifier, ShuffleEventInfo>(); - LOG.info(this.getClass().getSimpleName() + " : numInputs=" + numInputs + ", compressionCodec=" + LOG.info(srcNameTrimmed + ": numInputs=" + numInputs + ", compressionCodec=" + (codec == null ? "NoCompressionCodec" : codec.getClass().getName()) + ", numFetchers=" + numFetchers + ", ifileBufferSize=" + ifileBufferSize + ", ifileReadAheadEnabled=" + ifileReadAhead + ", ifileReadAheadLength=" + ifileReadAheadLength +", " @@ -317,7 +317,7 @@ public class ShuffleManager implements FetcherCallback { } if (LOG.isDebugEnabled()) { - LOG.debug("NumCompletedInputs: " + numCompletedInputs); + LOG.debug(srcNameTrimmed + ": " + "NumCompletedInputs: " + numCompletedInputs); } if (numCompletedInputs.get() < numInputs && !isShutdown.get()) { lock.lock(); @@ -330,20 +330,20 @@ public class ShuffleManager implements FetcherCallback { inputHost = pendingHosts.take(); } catch (InterruptedException e) { if (isShutdown.get()) { - LOG.info("Interrupted and hasBeenShutdown, Breaking out of ShuffleScheduler Loop"); + LOG.info(srcNameTrimmed + ": " + "Interrupted and hasBeenShutdown, Breaking out of ShuffleScheduler Loop"); break; } else { throw e; } } if (LOG.isDebugEnabled()) { - LOG.debug("Processing pending host: " + inputHost.toDetailedString()); + LOG.debug(srcNameTrimmed + ": " + "Processing pending host: " + inputHost.toDetailedString()); } if (inputHost.getNumPendingInputs() > 0 && !isShutdown.get()) { Fetcher fetcher = constructFetcherForHost(inputHost, conf); runningFetchers.add(fetcher); if (isShutdown.get()) { - LOG.info("hasBeenShutdown, Breaking out of ShuffleScheduler Loop"); + LOG.info(srcNameTrimmed + ": " + "hasBeenShutdown, Breaking out of ShuffleScheduler Loop"); } ListenableFuture<FetchResult> future = fetcherExecutor .submit(fetcher); @@ -353,7 +353,7 @@ public class ShuffleManager implements FetcherCallback { } } else { if (LOG.isDebugEnabled()) { - LOG.debug("Skipping host: " + inputHost.getIdentifier() + LOG.debug(srcNameTrimmed + ": " + "Skipping host: " + inputHost.getIdentifier() + " since it has no inputs to process"); } } @@ -364,8 +364,7 @@ public class ShuffleManager implements FetcherCallback { } } shufflePhaseTime.setValue(System.currentTimeMillis() - startTime); - LOG.info("Shutting down FetchScheduler, Was Interrupted: " + Thread.currentThread().isInterrupted()); - // TODO NEWTEZ Maybe clean up inputs. + LOG.info(srcNameTrimmed + ": " + "Shutting down FetchScheduler, Was Interrupted: " + Thread.currentThread().isInterrupted()); if (!fetcherExecutor.isShutdown()) { fetcherExecutor.shutdownNow(); } @@ -450,9 +449,11 @@ public class ShuffleManager implements FetcherCallback { } fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(), inputHost.getSrcPhysicalIndex(), pendingInputsForHost); - LOG.info("Created Fetcher for host: " + inputHost.getHost() - + ", info: " + inputHost.getAdditionalInfo() - + ", with inputs: " + pendingInputsForHost); + if (LOG.isDebugEnabled()) { + LOG.debug("Created Fetcher for host: " + inputHost.getHost() + + ", info: " + inputHost.getAdditionalInfo() + + ", with inputs: " + pendingInputsForHost); + } return fetcherBuilder.build(); } @@ -471,7 +472,7 @@ public class ShuffleManager implements FetcherCallback { } } if (LOG.isDebugEnabled()) { - LOG.debug("Adding input: " + srcAttemptIdentifier + ", to host: " + host); + LOG.debug(srcNameTrimmed + ": " + "Adding input: " + srcAttemptIdentifier + ", to host: " + host); } if (!validateInputAttemptForPipelinedShuffle(srcAttemptIdentifier)) { @@ -501,7 +502,9 @@ public class ShuffleManager implements FetcherCallback { public void addCompletedInputWithNoData( InputAttemptIdentifier srcAttemptIdentifier) { InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier(); - LOG.info("No input data exists for SrcTask: " + inputIdentifier + ". Marking as complete."); + if (LOG.isDebugEnabled()) { + LOG.debug("No input data exists for SrcTask: " + inputIdentifier + ". Marking as complete."); + } if (!completedInputSet.contains(inputIdentifier)) { synchronized (completedInputSet) { @@ -574,8 +577,10 @@ public class ShuffleManager implements FetcherCallback { } boolean isDone() { - LOG.info("finalEventId=" + finalEventId + ", eventsProcessed cardinality=" + - eventsProcessed.cardinality()); + if (LOG.isDebugEnabled()) { + LOG.debug("finalEventId=" + finalEventId + ", eventsProcessed cardinality=" + + eventsProcessed.cardinality()); + } return ((finalEventId != -1) && (finalEventId + 1) == eventsProcessed.cardinality()); } @@ -631,10 +636,10 @@ public class ShuffleManager implements FetcherCallback { lock.lock(); try { totalBytesShuffledTillNow += fetchedBytes; + logProgress(); } finally { lock.unlock(); } - logProgress(); } } } @@ -751,7 +756,7 @@ public class ShuffleManager implements FetcherCallback { InputAttemptIdentifier srcAttemptIdentifier, boolean connectFailed) { // TODO NEWTEZ. Implement logic to report fetch failures after a threshold. // For now, reporting immediately. - LOG.info("Fetch failed for src: " + srcAttemptIdentifier + LOG.info(srcNameTrimmed + ": " + "Fetch failed for src: " + srcAttemptIdentifier + "InputIdentifier: " + srcAttemptIdentifier + ", connectFailed: " + connectFailed); failedShufflesCounter.increment(1); @@ -885,16 +890,23 @@ public class ShuffleManager implements FetcherCallback { } } + private final AtomicInteger nextProgressLineEventCount = new AtomicInteger(0); + private void logProgress() { - double mbs = (double) totalBytesShuffledTillNow / (1024 * 1024); int inputsDone = numCompletedInputs.get(); - long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000 + 1; - double transferRate = mbs / secsSinceStart; - LOG.info("copy(" + inputsDone + " (spillsFetched=" + numFetchedSpills.get() + ") of " + - numInputs + - ". Transfer rate (CumulativeDataFetched/TimeSinceInputStarted)) " - + mbpsFormat.format(transferRate) + " MB/s)"); + if (inputsDone > nextProgressLineEventCount.get() || inputsDone == numInputs) { + nextProgressLineEventCount.addAndGet(50); + double mbs = (double) totalBytesShuffledTillNow / (1024 * 1024); + long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000 + 1; + + double transferRate = mbs / secsSinceStart; + LOG.info("copy(" + inputsDone + " (spillsFetched=" + numFetchedSpills.get() + ") of " + + numInputs + + ". Transfer rate (CumulativeDataFetched/TimeSinceInputStarted)) " + + mbpsFormat.format(transferRate) + " MB/s)"); + + } } @@ -902,15 +914,17 @@ public class ShuffleManager implements FetcherCallback { @Override public void onSuccess(Void result) { - LOG.info("Scheduler thread completed"); + LOG.info(srcNameTrimmed + ": " + "Scheduler thread completed"); } @Override public void onFailure(Throwable t) { if (isShutdown.get()) { - LOG.info("Already shutdown. Ignoring error: " + t); + if (LOG.isDebugEnabled()) { + LOG.debug(srcNameTrimmed + ": " + "Already shutdown. Ignoring error: " + t); + } } else { - LOG.error("Scheduler failed with error: ", t); + LOG.error(srcNameTrimmed + ": " + "Scheduler failed with error: ", t); inputContext.fatalError(t, "Shuffle Scheduler Failed"); } } @@ -939,7 +953,9 @@ public class ShuffleManager implements FetcherCallback { public void onSuccess(FetchResult result) { fetcher.shutdown(); if (isShutdown.get()) { - LOG.info("Already shutdown. Ignoring event from fetcher"); + if (LOG.isDebugEnabled()) { + LOG.debug(srcNameTrimmed + ": " + "Already shutdown. Ignoring event from fetcher"); + } } else { Iterable<InputAttemptIdentifier> pendingInputs = result.getPendingInputs(); if (pendingInputs != null && pendingInputs.iterator().hasNext()) { @@ -960,9 +976,11 @@ public class ShuffleManager implements FetcherCallback { // Unsuccessful - the fetcher may not have shutdown correctly. Try shutting it down. fetcher.shutdown(); if (isShutdown.get()) { - LOG.info("Already shutdown. Ignoring error from fetcher: " + t); + if (LOG.isDebugEnabled()) { + LOG.debug(srcNameTrimmed + ": " + "Already shutdown. Ignoring error from fetcher: " + t); + } } else { - LOG.error("Fetcher failed with error: ", t); + LOG.error(srcNameTrimmed + ": " + "Fetcher failed with error: ", t); shuffleError = t; inputContext.fatalError(t, "Fetch failed"); doBookKeepingForFetcherComplete(); http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java index 31a8651..604d213 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java @@ -60,11 +60,14 @@ public class SimpleFetchedInputAllocator implements FetchedInputAllocator, private final long maxAvailableTaskMemory; private final long initialMemoryAvailable; + + private final String srcNameTrimmed; private volatile long usedMemory = 0; - public SimpleFetchedInputAllocator(String uniqueIdentifier, Configuration conf, + public SimpleFetchedInputAllocator(String srcNameTrimmed, String uniqueIdentifier, Configuration conf, long maxTaskAvailableMemory, long memoryAvailable) { + this.srcNameTrimmed = srcNameTrimmed; this.conf = conf; this.maxAvailableTaskMemory = maxTaskAvailableMemory; this.initialMemoryAvailable = memoryAvailable; @@ -92,8 +95,6 @@ public class SimpleFetchedInputAllocator implements FetchedInputAllocator, this.memoryLimit = initialMemoryAvailable; } - LOG.info("RequestedMem=" + memReq + ", Allocated: " + this.memoryLimit); - final float singleShuffleMemoryLimitPercent = conf.getFloat( TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT_DEFAULT); @@ -107,9 +108,13 @@ public class SimpleFetchedInputAllocator implements FetchedInputAllocator, //TODO: cap it to MAX_VALUE until MemoryFetchedInput can handle > 2 GB this.maxSingleShuffleLimit = (long) Math.min((memoryLimit * singleShuffleMemoryLimitPercent), Integer.MAX_VALUE); - - LOG.info("SimpleInputManager -> " + "MemoryLimit: " + - this.memoryLimit + ", maxSingleMemLimit: " + this.maxSingleShuffleLimit); + + LOG.info(srcNameTrimmed + ": " + + "RequestedMemory=" + memReq + + ", AssignedMemorty=" + this.memoryLimit + + ", maxSingleShuffleLimit=" + this.maxSingleShuffleLimit + ); + } @Private @@ -137,7 +142,10 @@ public class SimpleFetchedInputAllocator implements FetchedInputAllocator, fileNameAllocator); } else { this.usedMemory += actualSize; - LOG.info("Used memory after allocating " + actualSize + " : " + usedMemory); + if (LOG.isDebugEnabled()) { + LOG.info(srcNameTrimmed + ": " + "Used memory after allocating " + actualSize + " : " + + usedMemory); + } return new MemoryFetchedInput(actualSize, compressedSize, inputAttemptIdentifier, this); } } @@ -196,7 +204,9 @@ public class SimpleFetchedInputAllocator implements FetchedInputAllocator, private synchronized void unreserve(long size) { this.usedMemory -= size; - LOG.info("Used memory after freeing " + size + " : " + usedMemory); + if (LOG.isDebugEnabled()) { + LOG.debug(srcNameTrimmed + ": " + "Used memory after freeing " + size + " : " + usedMemory); + } } } http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java index 67b5aa0..0ba37dd 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java @@ -90,6 +90,8 @@ class FetcherOrderedGrouped extends Thread { volatile HttpURLConnection connection; volatile DataInputStream input; + volatile MapHost assignedHost = null; + HttpConnection httpConnection; HttpConnectionParams httpConnectionParams; @@ -139,15 +141,15 @@ class FetcherOrderedGrouped extends Thread { this.localDiskFetchEnabled = localDiskFetchEnabled; - this.logIdentifier = "fetcher [" + TezUtilsInternal - .cleanVertexName(inputContext.getSourceVertexName()) + "] #" + id; + this.logIdentifier = "fetcher {" + TezUtilsInternal + .cleanVertexName(inputContext.getSourceVertexName()) + "} #" + id; setName(logIdentifier); setDaemon(true); } @VisibleForTesting protected void fetchNext() throws InterruptedException, IOException { - MapHost host = null; + assignedHost = null; try { // If merge is on, block merger.waitForInMemoryMerge(); @@ -156,20 +158,20 @@ class FetcherOrderedGrouped extends Thread { merger.waitForShuffleToMergeMemory(); // Get a host to shuffle from - host = scheduler.getHost(); + assignedHost = scheduler.getHost(); metrics.threadBusy(); - String hostPort = host.getHostIdentifier(); + String hostPort = assignedHost.getHostIdentifier(); if (localDiskFetchEnabled && hostPort.equals(localShuffleHostPort)) { - setupLocalDiskFetch(host); + setupLocalDiskFetch(assignedHost); } else { // Shuffle - copyFromHost(host); + copyFromHost(assignedHost); } } finally { cleanupCurrentConnection(false); - if (host != null) { - scheduler.freeHost(host); + if (assignedHost != null) { + scheduler.freeHost(assignedHost); metrics.threadFree(); } } @@ -191,6 +193,9 @@ class FetcherOrderedGrouped extends Thread { public void shutDown() throws InterruptedException { this.stopped = true; + if (LOG.isDebugEnabled()) { + LOG.debug("Fetcher stopped for host " + assignedHost); + } interrupt(); cleanupCurrentConnection(true); try { @@ -276,14 +281,19 @@ class FetcherOrderedGrouped extends Thread { // Setup connection again if disconnected cleanupCurrentConnection(true); if (stopped) { - LOG.info("Not re-establishing connection since Fetcher has been stopped"); + if (LOG.isDebugEnabled()) { + LOG.debug("Not re-establishing connection since Fetcher has been stopped"); + } return; } // Connect with retry if (!setupConnection(host, new LinkedList<InputAttemptIdentifier>(remaining))) { if (stopped) { cleanupCurrentConnection(true); - LOG.info("Not reporting connection re-establishment failure since fetcher is stopped"); + if (LOG.isDebugEnabled()) { + LOG.debug( + "Not reporting connection re-establishment failure since fetcher is stopped"); + } return; } failedTasks = new InputAttemptIdentifier[] {getNextRemainingAttempt()}; @@ -294,8 +304,10 @@ class FetcherOrderedGrouped extends Thread { if (failedTasks != null && failedTasks.length > 0) { if (stopped) { - LOG.info("Ignoring copyMapOutput failures for tasks: " + Arrays.toString(failedTasks) + - " since Fetcher has been stopped"); + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring copyMapOutput failures for tasks: " + Arrays.toString(failedTasks) + + " since Fetcher has been stopped"); + } } else { LOG.warn("copyMapOutput failed for tasks " + Arrays.toString(failedTasks)); for (InputAttemptIdentifier left : failedTasks) { @@ -328,7 +340,9 @@ class FetcherOrderedGrouped extends Thread { connectSucceeded = httpConnection.connect(); if (stopped) { - LOG.info("Detected fetcher has been shutdown after connection establishment. Returning"); + if (LOG.isDebugEnabled()) { + LOG.debug("Detected fetcher has been shutdown after connection establishment. Returning"); + } return false; } input = httpConnection.getInputStream(); @@ -336,7 +350,9 @@ class FetcherOrderedGrouped extends Thread { return true; } catch (IOException ie) { if (stopped) { - LOG.info("Not reporting fetch failure, since an Exception was caught after shutdown"); + if (LOG.isDebugEnabled()) { + LOG.debug("Not reporting fetch failure, since an Exception was caught after shutdown"); + } return false; } ioErrs.increment(1); @@ -402,7 +418,9 @@ class FetcherOrderedGrouped extends Thread { InputAttemptIdentifier.PATH_PREFIX + ", partition: " + header.forReduce); return new InputAttemptIdentifier[] {getNextRemainingAttempt()}; } else { - LOG.info("Already shutdown. Ignoring invalid map id error"); + if (LOG.isDebugEnabled()) { + LOG.debug("Already shutdown. Ignoring invalid map id error"); + } return EMPTY_ATTEMPT_ID_ARRAY; } } @@ -419,8 +437,10 @@ class FetcherOrderedGrouped extends Thread { // the remaining because we dont know where to start reading from. YARN-1773 return new InputAttemptIdentifier[] {getNextRemainingAttempt()}; } else { - LOG.info("Already shutdown. Ignoring invalid map id error. Exception: " + - e.getClass().getName() + ", Message: " + e.getMessage()); + if (LOG.isDebugEnabled()) { + LOG.debug("Already shutdown. Ignoring invalid map id error. Exception: " + + e.getClass().getName() + ", Message: " + e.getMessage()); + } return EMPTY_ATTEMPT_ID_ARRAY; } } @@ -436,7 +456,9 @@ class FetcherOrderedGrouped extends Thread { assert (srcAttemptId != null); return new InputAttemptIdentifier[]{srcAttemptId}; } else { - LOG.info("Already stopped. Ignoring verification failure."); + if (LOG.isDebugEnabled()) { + LOG.debug("Already stopped. Ignoring verification failure."); + } return EMPTY_ATTEMPT_ID_ARRAY; } } @@ -455,7 +477,9 @@ class FetcherOrderedGrouped extends Thread { ioErrs.increment(1); scheduler.reportLocalError(e); } else { - LOG.info("Already stopped. Ignoring error from merger.reserve"); + if (LOG.isDebugEnabled()) { + LOG.debug("Already stopped. Ignoring error from merger.reserve"); + } } return EMPTY_ATTEMPT_ID_ARRAY; } @@ -468,16 +492,19 @@ class FetcherOrderedGrouped extends Thread { } // Go! - LOG.info("fetcher#" + id + " about to shuffle output of map " + - mapOutput.getAttemptIdentifier() + " decomp: " + - decompressedLength + " len: " + compressedLength + " to " + mapOutput.getType()); + if (LOG.isDebugEnabled()) { + LOG.debug("fetcher#" + id + " about to shuffle output of map " + + mapOutput.getAttemptIdentifier() + " decomp: " + + decompressedLength + " len: " + compressedLength + " to " + mapOutput.getType()); + } + if (mapOutput.getType() == Type.MEMORY) { ShuffleUtils.shuffleToMemory(mapOutput.getMemory(), input, (int) decompressedLength, (int) compressedLength, codec, ifileReadAhead, ifileReadAheadLength, LOG, mapOutput.getAttemptIdentifier().toString()); } else if (mapOutput.getType() == Type.DISK) { ShuffleUtils.shuffleToDisk(mapOutput.getDisk(), host.getHostIdentifier(), - input, compressedLength, LOG, mapOutput.getAttemptIdentifier().toString()); + input, compressedLength, decompressedLength, LOG, mapOutput.getAttemptIdentifier().toString()); } else { throw new IOException("Unknown mapOutput type while fetching shuffle data:" + mapOutput.getType()); @@ -496,8 +523,10 @@ class FetcherOrderedGrouped extends Thread { return null; } catch (IOException ioe) { if (stopped) { - LOG.info("Not reporting fetch failure for exception during data copy: [" - + ioe.getClass().getName() + ", " + ioe.getMessage() + "]"); + if (LOG.isDebugEnabled()) { + LOG.debug("Not reporting fetch failure for exception during data copy: [" + + ioe.getClass().getName() + ", " + ioe.getMessage() + "]"); + } cleanupCurrentConnection(true); if (mapOutput != null) { mapOutput.abort(); // Release resources @@ -666,7 +695,10 @@ class FetcherOrderedGrouped extends Thread { LOG.warn("Failed to read local disk output of " + srcAttemptId + " from " + host.getHostIdentifier(), e); } else { - LOG.info("Ignoring fetch error during local disk copy since fetcher has already been stopped"); + if (LOG.isDebugEnabled()) { + LOG.debug( + "Ignoring fetch error during local disk copy since fetcher has already been stopped"); + } return; } } http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java index 2e6ebd9..e25c064 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java @@ -58,14 +58,13 @@ import org.apache.tez.runtime.library.common.Constants; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; import org.apache.tez.runtime.library.common.combine.Combiner; import org.apache.tez.runtime.library.common.sort.impl.IFile; -import org.apache.tez.runtime.library.common.sort.impl.TezMerger; -import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator; import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer; +import org.apache.tez.runtime.library.common.sort.impl.TezMerger; import org.apache.tez.runtime.library.common.sort.impl.TezMerger.Segment; +import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator; import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles; import org.apache.tez.runtime.library.hadoop.compat.NullProgressable; - /** * Usage. Create instance. setInitialMemoryAvailable(long), configureAndStart() * @@ -219,10 +218,15 @@ public class MergeManager { } else { this.postMergeMemLimit = maxRedBuffer; } - - LOG.info("InitialRequest: ShuffleMem=" + memLimit + ", postMergeMem=" + maxRedBuffer - + ", RuntimeTotalAvailable=" + this.initialMemoryAvailable + ". Updated to: ShuffleMem=" - + this.memoryLimit + ", postMergeMem: " + this.postMergeMemLimit); + + if (LOG.isDebugEnabled()) { + LOG.debug( + inputContext.getSourceVertexName() + ": " + "InitialRequest: ShuffleMem=" + memLimit + + ", postMergeMem=" + maxRedBuffer + + ", RuntimeTotalAvailable=" + this.initialMemoryAvailable + + ". Updated to: ShuffleMem=" + + this.memoryLimit + ", postMergeMem: " + this.postMergeMemLimit); + } this.ioSortFactor = conf.getInt( @@ -252,10 +256,11 @@ public class MergeManager { conf.getFloat( TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT_DEFAULT)); - LOG.info("MergerManager: memoryLimit=" + memoryLimit + ", " + + LOG.info(inputContext.getSourceVertexName() + ": MergerManager: memoryLimit=" + memoryLimit + ", " + "maxSingleShuffleLimit=" + maxSingleShuffleLimit + ", " + "mergeThreshold=" + mergeThreshold + ", " + "ioSortFactor=" + ioSortFactor + ", " + + "postMergeMem=" + postMergeMemLimit + ", " + "memToMemMergeOutputsThreshold=" + memToMemMergeOutputsThreshold); if (this.maxSingleShuffleLimit >= this.mergeThreshold) { @@ -310,8 +315,6 @@ public class MergeManager { long memLimit = conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY, (long)(maxAvailableTaskMemory * maxInMemCopyUse)); - LOG.info("Initial Shuffle Memory Required: " + memLimit + ", based on INPUT_BUFFER_factor: " + maxInMemCopyUse); - float maxRedPer = conf.getFloat(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT, TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_BUFFER_PERCENT_DEFAULT); if (maxRedPer > 1.0 || maxRedPer < 0.0) { @@ -319,7 +322,9 @@ public class MergeManager { } long maxRedBuffer = (long) (maxAvailableTaskMemory * maxRedPer); - LOG.info("Initial Memory required for final merged output: " + maxRedBuffer + ", using factor: " + maxRedPer); + LOG.info("Initial Memory required for SHUFFLE_BUFFER=" + memLimit + + " based on INPUT_BUFFER_FACTOR=" + maxInMemCopyUse + ", for final merged output=" + + maxRedBuffer + ", using factor: " + maxRedPer); long reqMem = Math.max(maxRedBuffer, memLimit); return reqMem; @@ -373,9 +378,11 @@ public class MergeManager { int fetcher ) throws IOException { if (!canShuffleToMemory(requestedSize)) { - LOG.info(srcAttemptIdentifier + ": Shuffling to disk since " + requestedSize + - " is greater than maxSingleShuffleLimit (" + - maxSingleShuffleLimit + ")"); + if (LOG.isDebugEnabled()) { + LOG.debug(srcAttemptIdentifier + ": Shuffling to disk since " + requestedSize + + " is greater than maxSingleShuffleLimit (" + + maxSingleShuffleLimit + ")"); + } return MapOutput.createDiskMapOutput(srcAttemptIdentifier, this, compressedLength, conf, fetcher, true, mapOutputFile); } @@ -437,9 +444,9 @@ public class MergeManager { public synchronized void closeInMemoryFile(MapOutput mapOutput) { inMemoryMapOutputs.add(mapOutput); LOG.info("closeInMemoryFile -> map-output of size: " + mapOutput.getSize() - + ", inMemoryMapOutputs.size() -> " + inMemoryMapOutputs.size() - + ", commitMemory -> " + commitMemory + ", usedMemory ->" + usedMemory + ", mapOutput=" + - mapOutput); + + ", inMemoryMapOutputs.size() -> " + inMemoryMapOutputs.size() + + ", commitMemory -> " + commitMemory + ", usedMemory ->" + usedMemory + ", mapOutput=" + + mapOutput); commitMemory+= mapOutput.getSize(); @@ -461,7 +468,7 @@ public class MergeManager { private void startMemToDiskMerge() { synchronized (inMemoryMerger) { if (!inMemoryMerger.isInProgress()) { - LOG.info("Starting inMemoryMerger's merge since commitMemory=" + + LOG.info(inputContext.getSourceVertexName() + ": " + "Starting inMemoryMerger's merge since commitMemory=" + commitMemory + " > mergeThreshold=" + mergeThreshold + ". Current usedMemory=" + usedMemory); inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs); @@ -473,7 +480,7 @@ public class MergeManager { public synchronized void closeInMemoryMergedFile(MapOutput mapOutput) { inMemoryMergedMapOutputs.add(mapOutput); - LOG.info("closeInMemoryMergedFile -> size: " + mapOutput.getSize() + + LOG.info("closeInMemoryMergedFile -> size: " + mapOutput.getSize() + ", inMemoryMergedMapOutputs.size() -> " + inMemoryMergedMapOutputs.size()); } @@ -573,7 +580,7 @@ public class MergeManager { Writer writer = new InMemoryWriter(mergedMapOutputs.getArrayStream()); - LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments + + LOG.info(inputContext.getSourceVertexName() + ": " + "Initiating Memory-to-Memory merge with " + noInMemorySegments + " segments of total-size: " + mergeOutputSize); // Nothing will be materialized to disk because the sort factor is being @@ -590,7 +597,7 @@ public class MergeManager { TezMerger.writeFile(rIter, writer, nullProgressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT); writer.close(); - LOG.info(inputContext.getUniqueIdentifier() + + LOG.info(inputContext.getSourceVertexName() + " Memory-to-Memory merge of the " + noInMemorySegments + " files in-memory complete."); @@ -642,7 +649,6 @@ public class MergeManager { Path outputPath = mapOutputFile.getInputFileForWrite( srcTaskIdentifier.getInputIdentifier().getInputIndex(), srcTaskIdentifier.getSpillEventId(), mergeOutputSize).suffix(Constants.MERGED_OUTPUT_PREFIX); - LOG.info("Patch..InMemoryMerger outputPath: " + outputPath); Writer writer = null; long outFileLen = 0; @@ -801,7 +807,7 @@ public class MergeManager { final long outputLen = localFS.getFileStatus(outputPath).getLen(); closeOnDiskFile(new FileChunk(outputPath, 0, outputLen)); - LOG.info(inputContext.getUniqueIdentifier() + + LOG.info(inputContext.getSourceVertexName() + " Finished merging " + inputs.size() + " map output files on disk of total-size " + approxOutputSize + "." +
