Repository: tez
Updated Branches:
  refs/heads/branch-0.7 c7a946c9c -> cd80d9ab5


http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
index 1eca9e9..417e66a 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
@@ -188,7 +188,7 @@ public class Shuffle implements ExceptionReporter {
     TezCounter bytesShuffedToMem = inputContext.getCounters().findCounter(
         TaskCounter.SHUFFLE_BYTES_TO_MEM);
     
-    LOG.info("Shuffle assigned with " + numInputs + " inputs" + ", codec: "
+    LOG.info(srcNameTrimmed + ": " + "Shuffle assigned with " + numInputs + " 
inputs" + ", codec: "
         + (codec == null ? "None" : codec.getClass().getName()) + 
         "ifileReadAhead: " + ifileReadAhead);
 
@@ -207,7 +207,8 @@ public class Shuffle implements ExceptionReporter {
           bytesShuffedToDisk,
           bytesShuffedToDiskDirect,
           bytesShuffedToMem,
-          startTime);
+          startTime,
+          srcNameTrimmed);
     this.mergePhaseTime = 
inputContext.getCounters().findCounter(TaskCounter.MERGE_PHASE_TIME);
     this.shufflePhaseTime = 
inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_PHASE_TIME);
 
@@ -232,14 +233,14 @@ public class Shuffle implements ExceptionReporter {
         sslShuffle);
     
     ExecutorService rawExecutor = Executors.newFixedThreadPool(1, new 
ThreadFactoryBuilder()
-        .setDaemon(true).setNameFormat("ShuffleAndMergeRunner [" + 
srcNameTrimmed + "]").build());
+        .setDaemon(true).setNameFormat("ShuffleAndMergeRunner {" + 
srcNameTrimmed + "}").build());
 
     int configuredNumFetchers = 
         conf.getInt(
             TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES,
             
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES_DEFAULT);
     numFetchers = Math.min(configuredNumFetchers, numInputs);
-    LOG.info("Num fetchers being started: " + numFetchers);
+    LOG.info(srcNameTrimmed + ": " + "Num fetchers being started: " + 
numFetchers);
     fetchers = Lists.newArrayListWithCapacity(numFetchers);
     localDiskFetchEnabled = conf.getBoolean(
         TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH,
@@ -253,7 +254,7 @@ public class Shuffle implements ExceptionReporter {
     if (!isShutDown.get()) {
       eventHandler.handleEvents(events);
     } else {
-      LOG.info("Ignoring events since already shutdown. EventCount: " + 
events.size());
+      LOG.info(srcNameTrimmed + ": " + "Ignoring events since already 
shutdown. EventCount: " + events.size());
     }
 
   }
@@ -387,7 +388,7 @@ public class Shuffle implements ExceptionReporter {
       }
 
       inputContext.inputIsReady();
-      LOG.info("merge complete for input vertex : " + 
inputContext.getSourceVertexName());
+      LOG.info("merge complete for input vertex : " + srcNameTrimmed);
       return kvIter;
     }
   }
@@ -400,16 +401,16 @@ public class Shuffle implements ExceptionReporter {
         for (FetcherOrderedGrouped fetcher : fetchers) {
           try {
             fetcher.shutDown();
-            LOG.info("Shutdown.." + fetcher.getName() + ", status:" + 
fetcher.isAlive() + ", "
+            LOG.info(srcNameTrimmed + ": " + "Shutdown.." + fetcher.getName() 
+ ", status:" + fetcher.isAlive() + ", "
                 + "isInterrupted:" + fetcher.isInterrupted());
           } catch (InterruptedException e) {
             if (ignoreErrors) {
-              LOG.info("Interrupted while shutting down fetchers. Ignoring.");
+              LOG.info(srcNameTrimmed + ": " + "Interrupted while shutting 
down fetchers. Ignoring.");
             } else {
               if (ie != null) {
                 ie = e;
               } else {
-                LOG.warn(
+                LOG.warn(srcNameTrimmed + ": " +
                     "Ignoring exception while shutting down fetcher since a 
previous one was seen and will be thrown "
                         + e);
               }
@@ -432,7 +433,7 @@ public class Shuffle implements ExceptionReporter {
         scheduler.close();
       } catch (InterruptedException e) {
         if (ignoreErrors) {
-          LOG.info("Interrupted while attempting to close the scheduler during 
cleanup. Ignoring");
+          LOG.info(srcNameTrimmed + ": " + "Interrupted while attempting to 
close the scheduler during cleanup. Ignoring");
         } else {
           throw e;
         }
@@ -446,7 +447,7 @@ public class Shuffle implements ExceptionReporter {
         merger.close();
       } catch (Throwable e) {
         if (ignoreErrors) {
-          LOG.info("Exception while trying to shutdown merger, Ignoring", e);
+          LOG.info(srcNameTrimmed + ": " + "Exception while trying to shutdown 
merger, Ignoring", e);
         } else {
           throw e;
         }
@@ -456,11 +457,14 @@ public class Shuffle implements ExceptionReporter {
 
   private void cleanupIgnoreErrors() {
     try {
+      if (eventHandler != null) {
+        eventHandler.logProgress(true);
+      }
       cleanupFetchers(true);
       cleanupShuffleScheduler(true);
       cleanupMerger(true);
     } catch (Throwable t) {
-      LOG.info("Error in cleaning up.., ", t);
+      LOG.info(srcNameTrimmed + ": " + "Error in cleaning up.., ", t);
     }
   }
 
@@ -468,6 +472,8 @@ public class Shuffle implements ExceptionReporter {
   public synchronized void reportException(Throwable t) {
     // RunShuffleCallable onFailure deals with ignoring errors on shutdown.
     if (throwable.get() == null) {
+      LOG.info(srcNameTrimmed + ": " + "Setting throwable in reportException 
with message [" + t.getMessage() +
+          "] from thread [" + Thread.currentThread().getName());
       throwable.set(t);
       throwingThreadName = Thread.currentThread().getName();
       // Notify the scheduler so that the reporting thread finds the 
@@ -494,15 +500,15 @@ public class Shuffle implements ExceptionReporter {
   private class ShuffleRunnerFutureCallback implements 
FutureCallback<TezRawKeyValueIterator> {
     @Override
     public void onSuccess(TezRawKeyValueIterator result) {
-      LOG.info("Shuffle Runner thread complete");
+      LOG.info(srcNameTrimmed + ": " + "Shuffle Runner thread complete");
     }
 
     @Override
     public void onFailure(Throwable t) {
       if (isShutDown.get()) {
-        LOG.info("Already shutdown. Ignoring error: ",  t);
+        LOG.info(srcNameTrimmed + ": " + "Already shutdown. Ignoring error");
       } else {
-        LOG.error("ShuffleRunner failed with error", t);
+        LOG.error(srcNameTrimmed + ": " + "ShuffleRunner failed with error", 
t);
         inputContext.fatalError(t, "Shuffle Runner Failed");
         cleanupIgnoreErrors();
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
index 32ac766..8d0c357 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
@@ -22,9 +22,11 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.BitSet;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleEventHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.tez.common.TezCommonUtils;
@@ -41,7 +43,7 @@ import 
org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovem
 
 import com.google.protobuf.InvalidProtocolBufferException;
 
-public class ShuffleInputEventHandlerOrderedGrouped {
+public class ShuffleInputEventHandlerOrderedGrouped implements 
ShuffleEventHandler {
   
   private static final Logger LOG = 
LoggerFactory.getLogger(ShuffleInputEventHandlerOrderedGrouped.class);
 
@@ -51,6 +53,11 @@ public class ShuffleInputEventHandlerOrderedGrouped {
   private int maxMapRuntime = 0;
   private final boolean sslShuffle;
 
+  private final AtomicInteger nextToLogEventCount = new AtomicInteger(0);
+  private final AtomicInteger numDmeEvents = new AtomicInteger(0);
+  private final AtomicInteger numObsoletionEvents = new AtomicInteger(0);
+  private final AtomicInteger numDmeEventsNoData = new AtomicInteger(0);
+
   public ShuffleInputEventHandlerOrderedGrouped(InputContext inputContext,
                                                 ShuffleScheduler scheduler, 
boolean sslShuffle) {
     this.inputContext = inputContext;
@@ -58,20 +65,36 @@ public class ShuffleInputEventHandlerOrderedGrouped {
     this.sslShuffle = sslShuffle;
   }
 
+  @Override
   public void handleEvents(List<Event> events) throws IOException {
     for (Event event : events) {
       handleEvent(event);
     }
   }
-  
-  
+
+  @Override
+  public void logProgress(boolean updateOnClose) {
+    LOG.info(inputContext.getSourceVertexName() + ": "
+        + "numDmeEventsSeen=" + numDmeEvents.get()
+        + ", numDmeEventsSeenWithNoData=" + numDmeEventsNoData.get()
+        + ", numObsoletionEventsSeen=" + numObsoletionEvents.get()
+        + (updateOnClose == true ? ", updateOnClose" : ""));
+  }
+
   private void handleEvent(Event event) throws IOException {
     if (event instanceof DataMovementEvent) {
+      numDmeEvents.incrementAndGet();
       processDataMovementEvent((DataMovementEvent) event);
       scheduler.updateEventReceivedTime();
     } else if (event instanceof InputFailedEvent) {
+      numObsoletionEvents.incrementAndGet();
       processTaskFailedEvent((InputFailedEvent) event);
     }
+    if (numDmeEvents.get() + numObsoletionEvents.get() > 
nextToLogEventCount.get()) {
+      logProgress(false);
+      // Log every 50 events seen.
+      nextToLogEventCount.addAndGet(50);
+    }
   }
 
   private void processDataMovementEvent(DataMovementEvent dmEvent) throws 
IOException {
@@ -82,8 +105,11 @@ public class ShuffleInputEventHandlerOrderedGrouped {
       throw new TezUncheckedException("Unable to parse DataMovementEvent 
payload", e);
     } 
     int partitionId = dmEvent.getSourceIndex();
-    LOG.info("DME srcIdx: " + partitionId + ", targetIdx: " + 
dmEvent.getTargetIndex()
-        + ", attemptNum: " + dmEvent.getVersion() + ", payload: " + 
ShuffleUtils.stringify(shufflePayload));
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("DME srcIdx: " + partitionId + ", targetIdx: " + 
dmEvent.getTargetIndex()
+          + ", attemptNum: " + dmEvent.getVersion() + ", payload: " +
+          ShuffleUtils.stringify(shufflePayload));
+    }
     // TODO NEWTEZ See if this duration hack can be removed.
     int duration = shufflePayload.getRunDuration();
     if (duration > maxMapRuntime) {
@@ -101,6 +127,7 @@ public class ShuffleInputEventHandlerOrderedGrouped {
                 "Source partition: " + partitionId + " did not generate any 
data. SrcAttempt: ["
                     + srcAttemptIdentifier + "]. Not fetching.");
           }
+          numDmeEventsNoData.incrementAndGet();
           scheduler.copySucceeded(srcAttemptIdentifier, null, 0, 0, 0, null);
           return;
         }
@@ -120,10 +147,11 @@ public class ShuffleInputEventHandlerOrderedGrouped {
   private void processTaskFailedEvent(InputFailedEvent ifEvent) {
     InputAttemptIdentifier taIdentifier = new 
InputAttemptIdentifier(ifEvent.getTargetIndex(), ifEvent.getVersion());
     scheduler.obsoleteInput(taIdentifier);
-    LOG.info("Obsoleting output of src-task: " + taIdentifier);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Obsoleting output of src-task: " + taIdentifier);
+    }
   }
 
-  // TODO NEWTEZ Handle encrypted shuffle
   @VisibleForTesting
   URI getBaseURI(String host, int port, int partitionId) {
     StringBuilder sb = ShuffleUtils.constructBaseURIForShuffleHandler(host, 
port,

http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index cdfd511..b98bc56 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -71,6 +72,7 @@ class ShuffleScheduler {
 
   private boolean[] finishedMaps;
   private final int numInputs;
+  private final String srcNameTrimmed;
   private int remainingMaps;
   private int numFetchedSpills;
   private Map<String, MapHost> mapLocations = new HashMap<String, MapHost>();
@@ -128,12 +130,14 @@ class ShuffleScheduler {
                           TezCounter failedShuffleCounter,
                           TezCounter bytesShuffledToDisk,
                           TezCounter bytesShuffledToDiskDirect,
-                          TezCounter bytesShuffledToMem, long startTime) {
+                          TezCounter bytesShuffledToMem, long startTime,
+                          String srcNameTrimmed) {
     this.inputContext = inputContext;
     this.numInputs = numberOfInputs;
     abortFailureLimit = Math.max(30, numberOfInputs / 10);
     remainingMaps = numberOfInputs;
     finishedMaps = new boolean[remainingMaps]; // default init to false
+    this.srcNameTrimmed = srcNameTrimmed;
     this.referee = new Referee();
     this.shuffle = shuffle;
     this.shuffledInputsCounter = shuffledInputsCounter;
@@ -312,7 +316,7 @@ class ShuffleScheduler {
       }
 
       if (remainingMaps == 0) {
-        LOG.info("All inputs fetched for input vertex : " + 
inputContext.getSourceVertexName());
+        LOG.info(srcNameTrimmed + ": " + "All inputs fetched for input vertex 
: " + inputContext.getSourceVertexName());
         notifyAll();
       }
 
@@ -330,7 +334,7 @@ class ShuffleScheduler {
       }
     } else {
       // input is already finished. duplicate fetch.
-      LOG.warn("Duplicate fetch of input no longer needs to be fetched: " + 
srcAttemptIdentifier);
+      LOG.warn(srcNameTrimmed + ": " + "Duplicate fetch of input no longer 
needs to be fetched: " + srcAttemptIdentifier);
       // free the resource - specially memory
       
       // If the src does not generate data, output will be null.
@@ -364,19 +368,24 @@ class ShuffleScheduler {
 
   @VisibleForTesting
   void reportExceptionForInput(Exception exception) {
-    LOG.error("Reporting exception for input", exception);
+    LOG.error(srcNameTrimmed + ": " + "Reporting exception for input", 
exception);
     shuffle.reportException(exception);
   }
 
+  private final AtomicInteger nextProgressLineEventCount = new 
AtomicInteger(0);
+
   private void logProgress() {
-    double mbs = (double) totalBytesShuffledTillNow / (1024 * 1024);
     int inputsDone = numInputs - remainingMaps;
-    long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000 + 1;
-
-    double transferRate = mbs / secsSinceStart;
-    LOG.info("copy(" + inputsDone + " (spillsFetched=" + numFetchedSpills +  
") of " + numInputs +
-        ". Transfer rate (CumulativeDataFetched/TimeSinceInputStarted)) "
-        + mbpsFormat.format(transferRate) + " MB/s)");
+    if (inputsDone > nextProgressLineEventCount.get() || inputsDone == 
numInputs) {
+      nextProgressLineEventCount.addAndGet(50);
+      double mbs = (double) totalBytesShuffledTillNow / (1024 * 1024);
+      long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000 + 
1;
+
+      double transferRate = mbs / secsSinceStart;
+      LOG.info("copy(" + inputsDone + " (spillsFetched=" + numFetchedSpills + 
") of " + numInputs +
+          ". Transfer rate (CumulativeDataFetched/TimeSinceInputStarted)) "
+          + mbpsFormat.format(transferRate) + " MB/s)");
+    }
   }
 
   public synchronized void copyFailed(InputAttemptIdentifier srcAttempt,
@@ -431,7 +440,7 @@ class ShuffleScheduler {
   }
 
   public void reportLocalError(IOException ioe) {
-    LOG.error("Shuffle failed : caused by local error", ioe);
+    LOG.error(srcNameTrimmed + ": " + "Shuffle failed : caused by local 
error", ioe);
     // Shuffle knows how to deal with failures post shutdown via the onFailure 
hook
     shuffle.reportException(ioe);
   }
@@ -444,7 +453,7 @@ class ShuffleScheduler {
       boolean connectError) {
     if ((reportReadErrorImmediately && (readError || connectError))
         || ((failures % maxFetchFailuresBeforeReporting) == 0)) {
-      LOG.info("Reporting fetch failure for InputIdentifier: "
+      LOG.info(srcNameTrimmed + ": " + "Reporting fetch failure for 
InputIdentifier: "
           + srcAttempt + " taskAttemptIdentifier: "
           + TezRuntimeUtils.getTaskAttemptIdentifier(
           inputContext.getSourceVertexName(), 
srcAttempt.getInputIdentifier().getInputIndex(),
@@ -501,7 +510,8 @@ class ShuffleScheduler {
         failureCounts.size() == (numInputs - doneMaps))
         && !reducerHealthy
         && (!reducerProgressedEnough || reducerStalled)) {
-      LOG.error("Shuffle failed with too many fetch failures " + "and 
insufficient progress!"
+      LOG.error(srcNameTrimmed + ": " + "Shuffle failed with too many fetch 
failures " +
+          "and insufficient progress!"
           + "failureCounts=" + failureCounts.size() + ", pendingInputs=" + 
(numInputs - doneMaps)
           + ", reducerHealthy=" + reducerHealthy + ", reducerProgressedEnough="
           + reducerProgressedEnough + ", reducerStalled=" + reducerStalled);
@@ -546,7 +556,7 @@ class ShuffleScheduler {
   
   public synchronized void obsoleteInput(InputAttemptIdentifier srcAttempt) {
     // The incoming srcAttempt does not contain a path component.
-    LOG.info("Adding obsolete input: " + srcAttempt);
+    LOG.info(srcNameTrimmed + ": " + "Adding obsolete input: " + srcAttempt);
     if (shuffleInfoEventsMap.containsKey(srcAttempt.getInputIdentifier())) {
       //Pipelined shuffle case (where shuffleInfoEventsMap gets populated).
       //Fail fast here.
@@ -565,6 +575,9 @@ class ShuffleScheduler {
 
   public synchronized MapHost getHost() throws InterruptedException {
       while(pendingHosts.isEmpty()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("PendingHosts=" + pendingHosts);
+        }
         wait();
       }
       
@@ -578,7 +591,7 @@ class ShuffleScheduler {
       pendingHosts.remove(host);     
       host.markBusy();
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Assigning " + host + " with " + 
host.getNumKnownMapOutputs() +
+        LOG.debug(srcNameTrimmed + ": " + "Assigning " + host + " with " + 
host.getNumKnownMapOutputs() +
             " to " + Thread.currentThread().getName());
       }
       shuffleStart.set(System.currentTimeMillis());
@@ -685,8 +698,10 @@ class ShuffleScheduler {
         notifyAll();
       }
     }
-    LOG.info(host + " freed by " + Thread.currentThread().getName() + " in " + 
-             (System.currentTimeMillis()-shuffleStart.get()) + "ms");
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(host + " freed by " + Thread.currentThread().getName() + " in 
" +
+          (System.currentTimeMillis() - shuffleStart.get()) + "ms");
+    }
   }
 
   public synchronized void resetKnownMaps() {
@@ -752,8 +767,8 @@ class ShuffleScheduler {
    */
   private class Referee extends Thread {
     public Referee() {
-      setName("ShufflePenaltyReferee ["
-          + 
TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName()) + "]");
+      setName("ShufflePenaltyReferee {"
+          + 
TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName()) + "}");
       setDaemon(true);
     }
 
@@ -780,6 +795,7 @@ class ShuffleScheduler {
   }
   
   public void close() throws InterruptedException {
+    logProgress();
     referee.interrupt();
     referee.join();
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
index fd62cfa..aba04e0 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
@@ -155,7 +155,10 @@ public abstract class ExternalSorter {
 
     rfs = ((LocalFileSystem)FileSystem.getLocal(this.conf)).getRaw();
 
-    LOG.info("Initial Mem : " + initialMemoryAvailable + ", assignedMb=" + 
((initialMemoryAvailable >> 20)));
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(outputContext.getDestinationVertexName() + ": Initial Mem 
bytes : " +
+          initialMemoryAvailable + ", in MB=" + ((initialMemoryAvailable >> 
20)));
+    }
     int assignedMb = (int) (initialMemoryAvailable >> 20);
     //Let the overflow checks happen in appropriate sorter impls
     this.availableMemoryMb = assignedMb;
@@ -173,9 +176,13 @@ public abstract class ExternalSorter {
     serializationFactory = new SerializationFactory(this.conf);
     keySerializer = serializationFactory.getSerializer(keyClass);
     valSerializer = serializationFactory.getSerializer(valClass);
-    LOG.info("keySerializer=" + keySerializer + "; valueSerializer=" + 
valSerializer
-        + "; comparator=" + (RawComparator) 
ConfigUtils.getIntermediateOutputKeyComparator(conf)
-        + "; conf=" + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+    LOG.info(outputContext.getDestinationVertexName() + " using: "
+        + "memoryMb=" + assignedMb
+        + ", keySerializerClass=" + keyClass
+        + ", valueSerializerClass=" + valSerializer
+        + ", comparator=" + (RawComparator) 
ConfigUtils.getIntermediateOutputKeyComparator(conf)
+        + ", partitioner=" + 
conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS)
+        + ", serialization=" + 
conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
 
     //    counters    
     mapOutputByteCounter = 
outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES);
@@ -230,8 +237,7 @@ public abstract class ExternalSorter {
     
     // Task outputs
     mapOutputFile = TezRuntimeUtils.instantiateTaskOutputManager(conf, 
outputContext);
-    
-    LOG.info("Instantiating Partitioner: [" + 
conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS) + "]");
+
     
this.conf.setInt(TezRuntimeFrameworkConfigs.TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS,
 this.partitions);
     this.partitioner = TezRuntimeUtils.instantiatePartitioner(this.conf);
     this.combiner = TezRuntimeUtils.instantiateCombiner(this.conf, 
outputContext);
@@ -322,9 +328,11 @@ public abstract class ExternalSorter {
         TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + " " + 
initialMemRequestMb + " should be "
             + "larger than 0 and should be less than the available task memory 
(MB):" +
             (maxAvailableTaskMemory >> 20));
-    LOG.info("Requested SortBufferSize ("
-        + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + "): "
-        + initialMemRequestMb);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Requested SortBufferSize ("
+          + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + "): "
+          + initialMemRequestMb);
+    }
     return reqBytes;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index fe96688..049087b 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -1,4 +1,4 @@
-/**
+  /**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
@@ -119,7 +119,9 @@ public class PipelinedSorter extends ExternalSorter {
   PipelinedSorter(OutputContext outputContext, Configuration conf, int 
numOutputs,
       long initialMemoryAvailable, int blkSize) throws IOException {
     super(outputContext, conf, numOutputs, initialMemoryAvailable);
-    
+
+    StringBuilder initialSetupLogLine = new StringBuilder("Setting up 
PipelinedSorter for ")
+        .append(outputContext.getDestinationVertexName()).append(": ");
     partitionBits = bitcount(partitions)+1;
 
     boolean confPipelinedShuffle = this.conf.getBoolean(TezRuntimeConfiguration
@@ -139,10 +141,29 @@ public class PipelinedSorter extends ExternalSorter {
     long usage = sortmb << 20;
     //Divide total memory into different blocks.
     int numberOfBlocks = Math.max(1, (int) Math.ceil(1.0 * usage / blockSize));
-    LOG.info("Number of Blocks : " + numberOfBlocks
-        + ", maxMemUsage=" + maxMemUsage + ", BLOCK_SIZE=" + blockSize + ", 
finalMergeEnabled="
-        + isFinalMergeEnabled() + ", pipelinedShuffle=" + pipelinedShuffle + 
", "
-        + "sendEmptyPartitionDetails=" + sendEmptyPartitionDetails);
+    initialSetupLogLine.append("#blocks=").append(numberOfBlocks);
+    initialSetupLogLine.append(", maxMemUsage=").append(maxMemUsage);
+    initialSetupLogLine.append(", BLOCK_SIZE=").append(blockSize);
+    initialSetupLogLine.append(", 
finalMergeEnabled=").append(isFinalMergeEnabled());
+    initialSetupLogLine.append(", pipelinedShuffle=").append(pipelinedShuffle);
+    initialSetupLogLine.append(", 
sendEmptyPartitions=").append(sendEmptyPartitionDetails);
+    initialSetupLogLine.append(", 
").append(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB).append(
+        "=").append(
+        sortmb);
+
+
+    initialSetupLogLine.append(", UsingHashComparator=");
+    // k/v serialization
+    if(comparator instanceof ProxyComparator) {
+      hasher = (ProxyComparator)comparator;
+      initialSetupLogLine.append(true);
+    } else {
+      hasher = null;
+      initialSetupLogLine.append(false);
+    }
+
+    LOG.info(initialSetupLogLine.toString());
+
     long totalCapacityWithoutMeta = 0;
     for (int i = 0; i < numberOfBlocks; i++) {
       Preconditions.checkArgument(usage > 0, "usage can't be less than zero " 
+ usage);
@@ -156,7 +177,6 @@ public class PipelinedSorter extends ExternalSorter {
     listIterator = bufferList.listIterator();
 
 
-    LOG.info(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + " = " + sortmb);
     Preconditions.checkArgument(listIterator.hasNext(), "Buffer list seems to 
be empty " + bufferList.size());
     span = new SortSpan(listIterator.next(), 1024*1024, 16, this.comparator);
     merger = new SpanMerger(); // SpanIterators are comparable
@@ -166,17 +186,11 @@ public class PipelinedSorter extends ExternalSorter {
                 
TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS_DEFAULT);
     sortmaster = Executors.newFixedThreadPool(sortThreads,
         new ThreadFactoryBuilder().setDaemon(true)
-        .setNameFormat("Sorter [" + TezUtilsInternal
-            .cleanVertexName(outputContext.getDestinationVertexName()) + "] 
#%d")
+        .setNameFormat("Sorter {" + TezUtilsInternal
+            .cleanVertexName(outputContext.getDestinationVertexName()) + "} 
#%d")
         .build());
 
-    // k/v serialization    
-    if(comparator instanceof ProxyComparator) {
-      hasher = (ProxyComparator)comparator;
-      LOG.info("Using the HashComparator");
-    } else {
-      hasher = null;
-    }    
+
     valSerializer.open(span.out);
     keySerializer.open(span.out);
     minSpillsForCombine = 
this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINE_MIN_SPILLS, 3);
@@ -217,6 +231,9 @@ public class PipelinedSorter extends ExternalSorter {
       spill();
       stopWatch.stop();
       LOG.info("Time taken for spill " + (stopWatch.elapsedMillis()) + " ms");
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(outputContext.getDestinationVertexName() + ": Time taken for 
spill " + (stopWatch.elapsedMillis()) + " ms");
+      }
       if (pipelinedShuffle) {
         sendPipelinedShuffleEvents();
       }
@@ -256,7 +273,8 @@ public class PipelinedSorter extends ExternalSorter {
         (numSpills - 1), indexCacheList.get(numSpills - 1), partitions, 
sendEmptyPartitionDetails,
         pathComponent);
     outputContext.sendEvents(events);
-    LOG.info("Added spill event for spill (final update=false), spillId=" + 
(numSpills - 1));
+    LOG.info(outputContext.getDestinationVertexName() +
+        ": Added spill event for spill (final update=false), spillId=" + 
(numSpills - 1));
   }
 
   @Override
@@ -362,11 +380,15 @@ public class PipelinedSorter extends ExternalSorter {
     final TezSpillRecord spillRec = new TezSpillRecord(partitions);
     // getSpillFileForWrite with size -1 as the serialized size of KV pair is 
still unknown
     final Path filename = mapOutputFile.getSpillFileForWrite(numSpills, -1);
+    Path indexFilename =
+        mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
+            * MAP_OUTPUT_INDEX_RECORD_LENGTH);
     spillFilePaths.put(numSpills, filename);
     FSDataOutputStream out = rfs.create(filename, true, 4096);
 
     try {
-      LOG.info("Spilling to " + filename.toString());
+      LOG.info(outputContext.getDestinationVertexName() + ": Spilling to " + 
filename.toString() +
+          ", indexFilename=" + indexFilename);
       for (int i = 0; i < partitions; ++i) {
         Writer writer = null;
         try {
@@ -399,10 +421,6 @@ public class PipelinedSorter extends ExternalSorter {
         }
       }
 
-      Path indexFilename =
-          mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
-              * MAP_OUTPUT_INDEX_RECORD_LENGTH);
-      LOG.info("Spill Index filename:" + indexFilename);
       spillFileIndexPaths.put(numSpills, indexFilename);
       spillRec.writeToFile(indexFilename, conf);
       //TODO: honor cache limits
@@ -433,7 +451,7 @@ public class PipelinedSorter extends ExternalSorter {
 
     try {
       merger.ready(); // wait for all the future results from sort threads
-      LOG.info("Spilling to " + filename.toString());
+      LOG.info(outputContext.getDestinationVertexName() + ": Spilling to " + 
filename.toString());
       for (int i = 0; i < partitions; ++i) {
         TezRawKeyValueIterator kvIter = merger.filter(i);
         //write merged output to disk
@@ -484,7 +502,7 @@ public class PipelinedSorter extends ExternalSorter {
   public void flush() throws IOException {
     final String uniqueIdentifier = outputContext.getUniqueIdentifier();
 
-    LOG.info("Starting flush of map output");
+    LOG.info(outputContext.getDestinationVertexName() + ": Starting flush of 
map output");
     span.end();
     merger.add(span.sort(sorter));
     spill();
@@ -495,7 +513,14 @@ public class PipelinedSorter extends ExternalSorter {
 
 
     if(indexCacheList.isEmpty()) {
-      LOG.warn("Index list is empty... returning");
+      /*
+       * If we do not have this check, and if the task gets killed in the 
middle, it can throw
+       * NPE leading to distraction when debugging.
+       */
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(outputContext.getDestinationVertexName()
+            + ": Index list is empty... returning");
+      }
       return;
     }
 
@@ -514,7 +539,7 @@ public class PipelinedSorter extends ExternalSorter {
         ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), 
isLastEvent,
             outputContext, i, indexCacheList.get(i), partitions,
             sendEmptyPartitionDetails, pathComponent);
-        LOG.info("Adding spill event for spill (final update=" + isLastEvent + 
"), spillId=" + i);
+        LOG.info(outputContext.getDestinationVertexName() + ": Adding spill 
event for spill (final update=" + isLastEvent + "), spillId=" + i);
       }
       outputContext.sendEvents(events);
       return;
@@ -533,8 +558,9 @@ public class PipelinedSorter extends ExternalSorter {
 
       sameVolRename(filename, finalOutputFile);
       sameVolRename(indexFilename, finalIndexFile);
-      if (LOG.isInfoEnabled()) {
-        LOG.info("numSpills=" + numSpills + ", finalOutputFile=" + 
finalOutputFile + ", "
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(outputContext.getDestinationVertexName() + ": numSpills=" + 
numSpills +
+            ", finalOutputFile=" + finalOutputFile + ", "
             + "finalIndexFile=" + finalIndexFile + ", filename=" + filename + 
", indexFilename=" +
             indexFilename);
       }
@@ -549,7 +575,8 @@ public class PipelinedSorter extends ExternalSorter {
         mapOutputFile.getOutputIndexFileForWrite(0); //TODO
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug("numSpills: " + numSpills + ", finalOutputFile:" + 
finalOutputFile + ", finalIndexFile:"
+      LOG.debug(outputContext.getDestinationVertexName() + ": " +
+          "numSpills: " + numSpills + ", finalOutputFile:" + finalOutputFile + 
", finalIndexFile:"
               + finalIndexFile);
     }
 
@@ -705,8 +732,8 @@ public class PipelinedSorter extends ExternalSorter {
       }
       ByteBuffer reserved = source.duplicate();
       reserved.mark();
-      LOG.info("reserved.remaining() = " + reserved.remaining());
-      LOG.info("reserved.metasize = "+ metasize);
+      LOG.info(outputContext.getDestinationVertexName() + ": " + 
"reserved.remaining()=" +
+          reserved.remaining() + ", reserved.metasize=" + metasize);
       reserved.position(metasize);
       kvbuffer = reserved.slice();
       reserved.flip();
@@ -725,7 +752,7 @@ public class PipelinedSorter extends ExternalSorter {
       if(length() > 1) {
         sorter.sort(this, 0, length(), nullProgressable);
       }
-      LOG.info("done sorting span=" + index + ", length=" + length() + ", "
+      LOG.info(outputContext.getDestinationVertexName() + ": " + "done sorting 
span=" + index + ", length=" + length() + ", "
           + "time=" + (System.currentTimeMillis() - start));
       return new SpanIterator(this);
     }
@@ -793,7 +820,7 @@ public class PipelinedSorter extends ExternalSorter {
         newSpan = new SortSpan(remaining, items, perItem,
             ConfigUtils.getIntermediateOutputKeyComparator(conf));
         newSpan.index = index+1;
-        LOG.info(String.format("New Span%d.length = %d, perItem = %d", 
newSpan.index, newSpan
+        LOG.info(String.format(outputContext.getDestinationVertexName() + ": " 
+ "New Span%d.length = %d, perItem = %d", newSpan.index, newSpan
             .length(), perItem) + ", counter:" + 
mapOutputRecordCounter.getValue());
         return newSpan;
       }
@@ -815,11 +842,11 @@ public class PipelinedSorter extends ExternalSorter {
         return null;
       }
       int perItem = kvbuffer.position()/items;
-      LOG.info(String.format("Span%d.length = %d, perItem = %d", index, 
length(), perItem));
+      LOG.info(outputContext.getDestinationVertexName() + ": " + 
String.format("Span%d.length = %d, perItem = %d", index, length(), perItem));
       if(remaining.remaining() < METASIZE+perItem) {
         //Check if we can get the next Buffer from the main buffer list
         if (listIterator.hasNext()) {
-          LOG.info("Getting memory from next block in the list, 
recordsWritten=" +
+          LOG.info(outputContext.getDestinationVertexName() + ": " + "Getting 
memory from next block in the list, recordsWritten=" +
               mapOutputRecordCounter.getValue());
           reinit = true;
           return listIterator.next();
@@ -1113,10 +1140,10 @@ public class PipelinedSorter extends ExternalSorter {
             total += sp.span.length();
             eq += sp.span.getEq();
         }
-        LOG.info("Heap = " + sb.toString());
+        LOG.info(outputContext.getDestinationVertexName() + ": " + "Heap = " + 
sb.toString());
         return true;
       } catch(Exception e) {
-        LOG.info(e.toString());
+        LOG.info(outputContext.getDestinationVertexName() + ": " + 
e.toString());
         return false;
       }
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index f58b10b..2a10c35 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -129,7 +129,7 @@ public class DefaultSorter extends ExternalSorter 
implements IndexedSortable {
     final float spillper = this.conf.getFloat(
         TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT,
         TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT_DEFAULT);
-    final int sortmb = computeSortBufferSize((int) availableMemoryMb);
+    final int sortmb = computeSortBufferSize((int) availableMemoryMb, 
outputContext.getDestinationVertexName());
 
     Preconditions.checkArgument(spillper <= (float) 1.0 && spillper > (float) 
0.0,
         TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT
@@ -143,7 +143,8 @@ public class DefaultSorter extends ExternalSorter 
implements IndexedSortable {
         .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED_DEFAULT);
 
     if (confPipelinedShuffle) {
-      LOG.warn(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED  
+ " does not work "
+      LOG.warn(outputContext.getDestinationVertexName() + ": " +
+          TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED + " 
does not work "
           + "with DefaultSorter. It is supported only with PipelinedSorter.");
     }
 
@@ -163,10 +164,14 @@ public class DefaultSorter extends ExternalSorter 
implements IndexedSortable {
     softLimit = (int)(kvbuffer.length * spillper);
     bufferRemaining = softLimit;
     if (LOG.isInfoEnabled()) {
-      LOG.info(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + ": " + sortmb);
-      LOG.info("soft limit at " + softLimit);
-      LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);
-      LOG.info("kvstart = " + kvstart + "; length = " + maxRec + "; 
finalMergeEnabled = " + isFinalMergeEnabled());
+      LOG.info(outputContext.getDestinationVertexName() + ": "
+          + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + "=" + sortmb
+          + ", soft limit=" + softLimit
+          + ", bufstart=" + bufstart
+          + ", bufvoid=" + bufvoid
+          + ", kvstart=" + kvstart
+          + ", legnth=" + maxRec
+          + ", finalMergeEnabled=" + isFinalMergeEnabled());
     }
 
     // k/v serialization
@@ -176,8 +181,8 @@ public class DefaultSorter extends ExternalSorter 
implements IndexedSortable {
     spillInProgress = false;
     minSpillsForCombine = 
this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINE_MIN_SPILLS, 3);
     spillThread.setDaemon(true);
-    spillThread.setName("SpillThread ["
-        + 
TezUtilsInternal.cleanVertexName(outputContext.getDestinationVertexName() + 
"]"));
+    spillThread.setName("SpillThread {"
+        + 
TezUtilsInternal.cleanVertexName(outputContext.getDestinationVertexName() + 
"}"));
     spillLock.lock();
     try {
       spillThread.start();
@@ -196,7 +201,7 @@ public class DefaultSorter extends ExternalSorter 
implements IndexedSortable {
   }
 
   @VisibleForTesting
-  static int computeSortBufferSize(int availableMemoryMB) {
+  static int computeSortBufferSize(int availableMemoryMB, String logContext) {
 
     if (availableMemoryMB <= 0) {
       throw new 
RuntimeException(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB +
@@ -204,7 +209,7 @@ public class DefaultSorter extends ExternalSorter 
implements IndexedSortable {
     }
 
     if (availableMemoryMB > MAX_IO_SORT_MB) {
-      LOG.warn("Scaling down " + 
TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB +
+      LOG.warn(logContext + ": Scaling down " + 
TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB +
           "=" + availableMemoryMB + " to " + MAX_IO_SORT_MB
           + " (max sort buffer size supported forDefaultSorter)");
     }
@@ -350,7 +355,7 @@ public class DefaultSorter extends ExternalSorter 
implements IndexedSortable {
       kvindex = (int)(((long)kvindex - NMETA + kvmeta.capacity()) % 
kvmeta.capacity());
       totalKeys++;
     } catch (MapBufferTooSmallException e) {
-      LOG.info("Record too large for in-memory buffer: " + e.getMessage());
+      LOG.info(outputContext.getDestinationVertexName() + ": Record too large 
for in-memory buffer: " + e.getMessage());
       spillSingleRecord(key, value, partition);
       mapOutputRecordCounter.increment(1);
       return;
@@ -369,7 +374,7 @@ public class DefaultSorter extends ExternalSorter 
implements IndexedSortable {
     // Cast one of the operands to long to avoid integer overflow
     kvindex = (int) (((long) aligned - METASIZE + kvbuffer.length) % 
kvbuffer.length) / 4;
     if (LOG.isInfoEnabled()) {
-      LOG.info("(EQUATOR) " + pos + " kvi " + kvindex +
+      LOG.info(outputContext.getDestinationVertexName() + ": " + "(EQUATOR) " 
+ pos + " kvi " + kvindex +
           "(" + (kvindex * 4) + ")");
     }
   }
@@ -387,7 +392,7 @@ public class DefaultSorter extends ExternalSorter 
implements IndexedSortable {
     // Cast one of the operands to long to avoid integer overflow
     kvstart = kvend = (int) (((long) aligned - METASIZE + kvbuffer.length) % 
kvbuffer.length) / 4;
     if (LOG.isInfoEnabled()) {
-      LOG.info("(RESET) equator " + e + " kv " + kvstart + "(" +
+      LOG.info(outputContext.getDestinationVertexName() + ": " + "(RESET) 
equator " + e + " kv " + kvstart + "(" +
         (kvstart * 4) + ")" + " kvi " + kvindex + "(" + (kvindex * 4) + ")");
     }
   }
@@ -650,13 +655,13 @@ public class DefaultSorter extends ExternalSorter 
implements IndexedSortable {
         kvend = (kvindex + NMETA) % kvmeta.capacity();
         bufend = bufmark;
         if (LOG.isInfoEnabled()) {
-          LOG.info("Sorting & Spilling map output");
-          LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
-                   "; bufvoid = " + bufvoid);
-          LOG.info("kvstart = " + kvstart + "(" + (kvstart * 4) +
-                   "); kvend = " + kvend + "(" + (kvend * 4) +
-                   "); length = " + (distanceTo(kvend, kvstart,
-                         kvmeta.capacity()) + 1) + "/" + maxRec);
+          LOG.info(
+              outputContext.getDestinationVertexName() + ": " + "Sorting & 
Spilling map output. "
+                  + "bufstart = " + bufstart + ", bufend = " + bufmark + ", 
bufvoid = " + bufvoid
+                  + "; " + "kvstart=" + kvstart + "(" + (kvstart * 4) + ")"
+                  + ", kvend = " + kvend + "(" + (kvend * 4) + ")"
+                  + ", length = " + (distanceTo(kvend, kvstart, 
kvmeta.capacity()) + 1) + "/" +
+                  maxRec);
         }
         sortAndSpill();
       }
@@ -707,7 +712,7 @@ public class DefaultSorter extends ExternalSorter 
implements IndexedSortable {
             spillLock.unlock();
             sortAndSpill();
           } catch (Throwable t) {
-            LOG.warn("Got an exception in sortAndSpill", t);
+            LOG.warn(outputContext.getDestinationVertexName() + ": " + "Got an 
exception in sortAndSpill", t);
             sortSpillException = t;
           } finally {
             spillLock.lock();
@@ -746,13 +751,11 @@ public class DefaultSorter extends ExternalSorter 
implements IndexedSortable {
     bufend = bufmark;
     spillInProgress = true;
     if (LOG.isInfoEnabled()) {
-      LOG.info("Spilling map output");
-      LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
-               "; bufvoid = " + bufvoid);
-      LOG.info("kvstart = " + kvstart + "(" + (kvstart * 4) +
-               "); kvend = " + kvend + "(" + (kvend * 4) +
-               "); length = " + (distanceTo(kvend, kvstart,
-                     kvmeta.capacity()) + 1) + "/" + maxRec);
+      LOG.info(outputContext.getDestinationVertexName() + ": Spilling map 
output."
+          + "bufstart=" + bufstart + ", bufend = " + bufmark + ", bufvoid = " 
+ bufvoid
+          +"; kvstart=" + kvstart + "(" + (kvstart * 4) + ")"
+          +", kvend = " + kvend + "(" + (kvend * 4) + ")"
+          + ", length = " + (distanceTo(kvend, kvstart, kvmeta.capacity()) + 
1) + "/" + maxRec);
     }
     spillReady.signal();
   }
@@ -849,7 +852,7 @@ public class DefaultSorter extends ExternalSorter 
implements IndexedSortable {
               TezRawKeyValueIterator kvIter =
                 new MRResultIterator(spstart, spindex);
               if (LOG.isDebugEnabled()) {
-                LOG.debug("Running combine processor");
+                LOG.debug(outputContext.getDestinationVertexName() + ": " + 
"Running combine processor");
               }
               runCombineProcessor(kvIter, writer);
             }
@@ -884,7 +887,7 @@ public class DefaultSorter extends ExternalSorter 
implements IndexedSortable {
         totalIndexCacheMemory +=
           spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
       }
-      LOG.info("Finished spill " + numSpills);
+      LOG.info(outputContext.getDestinationVertexName() + ": " + "Finished 
spill " + numSpills);
       ++numSpills;
       if (!isFinalMergeEnabled()) {
         numShuffleChunks.setValue(numSpills);
@@ -1063,7 +1066,8 @@ public class DefaultSorter extends ExternalSorter 
implements IndexedSortable {
     ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), 
isLastEvent,
         outputContext, index, spillRecord, partitions, 
sendEmptyPartitionDetails, pathComponent);
 
-    LOG.info("Adding spill event for spill (final update=" + isLastEvent + "), 
spillId=" + index);
+    LOG.info(outputContext.getDestinationVertexName() + ": " +
+        "Adding spill event for spill (final update=" + isLastEvent + "), 
spillId=" + index);
 
     if (sendEvent) {
       outputContext.sendEvents(events);
@@ -1213,7 +1217,8 @@ public class DefaultSorter extends ExternalSorter 
implements IndexedSortable {
           segmentList.add(i, s);
 
           if (LOG.isDebugEnabled()) {
-            LOG.debug("TaskIdentifier=" + taskIdentifier + " Partition=" + 
parts +
+            LOG.debug(outputContext.getDestinationVertexName() + ": "
+                + "TaskIdentifier=" + taskIdentifier + " Partition=" + parts +
                 "Spill =" + i + "(" + indexRecord.getStartOffset() + "," +
                 indexRecord.getRawLength() + ", " +
                 indexRecord.getPartLength() + ")");

http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 37d8be6..4de167a 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -91,6 +91,7 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
   // Maybe setup a separate statistics class which can be shared between the
   // buffer and the main path instead of having multiple arrays.
 
+  private final String destNameTrimmed;
   private final long availableMemory;
   @VisibleForTesting
   final WrappedBuffer[] buffers;
@@ -149,6 +150,7 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
     super(outputContext, conf, numOutputs);
     Preconditions.checkArgument(availableMemoryBytes >= 0, "availableMemory 
should be >= 0 bytes");
 
+    this.destNameTrimmed = 
TezUtilsInternal.cleanVertexName(outputContext.getDestinationVertexName());
     //Not checking for TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT as it might 
not add much value in
     // this case.  Add it later if needed.
     pipelinedShuffle = this.conf.getBoolean(TezRuntimeConfiguration
@@ -170,13 +172,16 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
     int maxSingleBufferSizeBytes = conf.getInt(
         
TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES, 
Integer.MAX_VALUE);
     computeNumBuffersAndSize(maxSingleBufferSizeBytes);
-    LOG.info("Running with numBuffers=" + numBuffers + ", sizePerBuffer=" + 
sizePerBuffer);
+
     availableBuffers = new LinkedBlockingQueue<WrappedBuffer>();
     buffers = new WrappedBuffer[numBuffers];
     // Set up only the first buffer to start with.
     buffers[0] = new WrappedBuffer(numOutputs, sizePerBuffer);
     numInitializedBuffers = 1;
-    LOG.info("Initialize Buffer #" + numInitializedBuffers + " with size=" + 
sizePerBuffer);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(destNameTrimmed + ": " + "Initializing Buffer #" +
+          numInitializedBuffers + " with size=" + sizePerBuffer);
+    }
     currentBuffer = buffers[0];
     baos = new ByteArrayOutputStream();
     dos = new DataOutputStream(baos);
@@ -189,8 +194,8 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
         new ThreadFactoryBuilder()
             .setDaemon(true)
             .setNameFormat(
-                "UnorderedOutSpiller ["
-                    + 
TezUtilsInternal.cleanVertexName(outputContext.getDestinationVertexName()) + 
"]")
+                "UnorderedOutSpiller {"
+                    + 
TezUtilsInternal.cleanVertexName(outputContext.getDestinationVertexName()) + 
"}")
             .build());
     spillExecutor = MoreExecutors.listeningDecorator(executor);
     numRecordsPerPartition = new int[numPartitions];
@@ -213,7 +218,12 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
       skipBuffers = false;
       writer = null;
     }
-    LOG.info("pipelinedShuffle=" + pipelinedShuffle + ", skipBuffers=" + 
skipBuffers);
+    LOG.info(destNameTrimmed + ": "
+        + "numBuffers=" + numBuffers
+        + ", sizePerBuffer=" + sizePerBuffer
+        + ", skipBuffers=" + skipBuffers
+        + ", pipelinedShuffle=" + pipelinedShuffle
+        + ", numPartitions=" + numPartitions);
   }
 
   private void computeNumBuffersAndSize(int bufferLimit) {
@@ -320,7 +330,7 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
       currentBuffer.reset();
     } else {
       // Update overall stats
-      LOG.info("Moving to next buffer and triggering spill");
+      LOG.info(destNameTrimmed + ": " + "Moving to next buffer and triggering 
spill");
       updateGlobalStats(currentBuffer);
 
       pendingSpillCount.incrementAndGet();
@@ -419,10 +429,10 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
       spillResult = new SpillResult(compressedLength, this.wrappedBuffer);
 
       handleSpillIndex(spillPathDetails, spillRecord);
-      LOG.info("Finished spill " + spillIndex);
+      LOG.info(destNameTrimmed + ": " + "Finished spill " + spillIndex);
 
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Spill=" + spillIndex + ", indexPath="
+        LOG.debug(destNameTrimmed + ": " + "Spill=" + spillIndex + ", 
indexPath="
             + spillPathDetails.indexFilePath + ", outputPath=" + 
spillPathDetails.outputFilePath);
       }
       return spillResult;
@@ -460,7 +470,7 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
     isShutdown.set(true);
     spillLock.lock();
     try {
-      LOG.info("Waiting for all spills to complete : Pending : " + 
pendingSpillCount.get());
+      LOG.info(destNameTrimmed + ": " + "Waiting for all spills to complete : 
Pending : " + pendingSpillCount.get());
       while (pendingSpillCount.get() != 0 && spillException == null) {
         spillInProgress.await();
       }
@@ -468,7 +478,7 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
       spillLock.unlock();
     }
     if (spillException != null) {
-      LOG.error("Error during spill, throwing");
+      LOG.error(destNameTrimmed + ": " + "Error during spill, throwing");
       // Assuming close will be called on the same thread as the write
       cleanup();
       currentBuffer.cleanup();
@@ -479,7 +489,7 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
         throw new IOException(spillException);
       }
     } else {
-      LOG.info("All spills complete");
+      LOG.info(destNameTrimmed + ": " + "All spills complete");
       // Assuming close will be called on the same thread as the write
       cleanup();
 
@@ -686,7 +696,7 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
       for (int i = 0; i < numPartitions; i++) {
         long segmentStart = out.getPos();
         if (numRecordsPerPartition[i] == 0) {
-          LOG.info("Skipping partition: " + i + " in final merge since it has 
no records");
+          LOG.info(destNameTrimmed + ": " + "Skipping partition: " + i + " in 
final merge since it has no records");
           continue;
         }
         writer = new Writer(conf, out, keyClass, valClass, codec, null, null);
@@ -738,7 +748,7 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
     }
     finalSpillRecord.writeToFile(finalIndexPath, conf);
     fileOutputBytesCounter.increment(indexFileSizeEstimate);
-    LOG.info("Finished final spill after merging : " + numSpills.get() + " 
spills");
+    LOG.info(destNameTrimmed + ": " + "Finished final spill after merging : " 
+ numSpills.get() + " spills");
   }
 
   private void writeLargeRecord(final Object key, final Object value, final 
int partition)
@@ -790,9 +800,9 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
 
       sendPipelinedEventForSpill(emptyPartitions, spillIndex, false);
 
-      LOG.info("Finished writing large record of size " + outSize + " to spill 
file " + spillIndex);
+      LOG.info(destNameTrimmed + ": " + "Finished writing large record of size 
" + outSize + " to spill file " + spillIndex);
       if (LOG.isDebugEnabled()) {
-        LOG.debug("LargeRecord Spill=" + spillIndex + ", indexPath="
+        LOG.debug(destNameTrimmed + ": " + "LargeRecord Spill=" + spillIndex + 
", indexPath="
             + spillPathDetails.indexFilePath + ", outputPath="
             + spillPathDetails.outputFilePath);
       }
@@ -901,10 +911,10 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
       Event compEvent = generateDMEvent(true, spillNumber, isFinalUpdate,
           pathComponent, emptyPartitions);
 
-      LOG.info("Adding spill event for spill (final update=" + isFinalUpdate + 
"), spillId=" + spillNumber);
+      LOG.info(destNameTrimmed + ": " + "Adding spill event for spill (final 
update=" + isFinalUpdate + "), spillId=" + spillNumber);
       outputContext.sendEvents(Collections.singletonList(compEvent));
     } catch (IOException e) {
-      LOG.error("Error in sending pipelined events", e);
+      LOG.error(destNameTrimmed + ": " + "Error in sending pipelined events", 
e);
       outputContext.fatalError(e, "Error in sending pipelined events");
     }
   }
@@ -924,7 +934,6 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
 
     @Override
     public void onSuccess(SpillResult result) {
-      LOG.info("Spill# " + spillNumber + " complete.");
       spilledSize += result.spillSize;
 
       sendPipelinedEventForSpill(result.wrappedBuffer.recordsPerPartition, 
spillNumber, false);
@@ -934,7 +943,7 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
         availableBuffers.add(result.wrappedBuffer);
 
       } catch (Throwable e) {
-        LOG.error("Failure while attempting to reset buffer after spill", e);
+        LOG.error(destNameTrimmed + ": " + "Failure while attempting to reset 
buffer after spill", e);
         outputContext.fatalError(e, "Failure while attempting to reset buffer 
after spill");
       }
 
@@ -959,7 +968,7 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
     public void onFailure(Throwable t) {
       // spillException setup to throw an exception back to the user. Requires 
synchronization.
       // Consider removing it in favor of having Tez kill the task
-      LOG.error("Failure while spilling to disk", t);
+      LOG.error(destNameTrimmed + ": " + "Failure while spilling to disk", t);
       spillException = t;
       outputContext.fatalError(t, "Failure while spilling to disk");
       spillLock.lock();

http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
index d784fcd..fa8630d 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
@@ -129,8 +129,10 @@ public class OrderedGroupedKVInput extends 
AbstractLogicalInput {
       List<Event> pending = new LinkedList<Event>();
       pendingEvents.drainTo(pending);
       if (pending.size() > 0) {
-        LOG.info("NoAutoStart delay in processing first event: "
-            + (System.currentTimeMillis() - firstEventReceivedTime));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("NoAutoStart delay in processing first event: "
+              + (System.currentTimeMillis() - firstEventReceivedTime));
+        }
         shuffle.handleEvents(pending);
       }
       isStarted.set(true);
@@ -274,10 +276,16 @@ public class OrderedGroupedKVInput extends 
AbstractLogicalInput {
   protected synchronized void createValuesIterator()
       throws IOException {
     // Not used by ReduceProcessor
-    vIter = new ValuesIterator(rawIter,
-        (RawComparator) ConfigUtils.getIntermediateInputKeyComparator(conf),
-        ConfigUtils.getIntermediateInputKeyClass(conf),
-        ConfigUtils.getIntermediateInputValueClass(conf), conf, 
inputKeyCounter, inputValueCounter);
+    RawComparator rawComparator = 
ConfigUtils.getIntermediateInputKeyComparator(conf);
+    Class<?> keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
+    Class<?> valClass = ConfigUtils.getIntermediateInputValueClass(conf);
+    LOG.info(getContext().getSourceVertexName() + ": " + "creating 
ValuesIterator with "
+        + "comparator=" + rawComparator.getClass().getName()
+        + ", keyClass=" + keyClass.getName()
+        + ", valClass=" + valClass.getName());
+
+    vIter = new ValuesIterator(rawIter, rawComparator, keyClass, valClass,
+        conf, inputKeyCounter, inputValueCounter);
 
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
index 62fa9a5..f5c8091 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
@@ -24,6 +24,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.tez.common.TezUtilsInternal;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -131,7 +132,8 @@ public class UnorderedKVInput extends AbstractLogicalInput {
       ifileBufferSize = conf.getInt("io.file.buffer.size",
           TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);
 
-      this.inputManager = new 
SimpleFetchedInputAllocator(getContext().getUniqueIdentifier(), conf,
+      this.inputManager = new SimpleFetchedInputAllocator(
+          
TezUtilsInternal.cleanVertexName(getContext().getSourceVertexName()), 
getContext().getUniqueIdentifier(), conf,
           getContext().getTotalMemoryAvailableToTask(),
           memoryUpdateCallbackHandler.getMemoryAssigned());
 
@@ -149,8 +151,10 @@ public class UnorderedKVInput extends AbstractLogicalInput 
{
       List<Event> pending = new LinkedList<Event>();
       pendingEvents.drainTo(pending);
       if (pending.size() > 0) {
-        LOG.info("NoAutoStart delay in processing first event: "
-            + (System.currentTimeMillis() - firstEventReceivedTime));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(getContext().getSourceVertexName() + ": " + "NoAutoStart 
delay in processing first event: "
+              + (System.currentTimeMillis() - firstEventReceivedTime));
+        }
         inputEventHandler.handleEvents(pending);
       }
       isStarted.set(true);
@@ -207,6 +211,10 @@ public class UnorderedKVInput extends AbstractLogicalInput 
{
 
   @Override
   public synchronized List<Event> close() throws Exception {
+    if (this.inputEventHandler != null) {
+      this.inputEventHandler.logProgress(true);
+    }
+
     if (this.shuffleManager != null) {
       this.shuffleManager.shutdown();
     }
@@ -217,7 +225,7 @@ public class UnorderedKVInput extends AbstractLogicalInput {
     long inputRecords = getContext().getCounters()
         .findCounter(TaskCounter.INPUT_RECORDS_PROCESSED).getValue();
     getContext().getStatisticsReporter().reportItemsProcessed(inputRecords);
-    
+
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
index 6227fb9..2b4c0f4 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
@@ -127,7 +127,7 @@ public class OrderedPartitionedKVOutput extends 
AbstractLogicalOutput {
 
       if (pipelinedShuffle) {
         if (finalMergeEnabled) {
-          LOG.info("Disabling final merge as "
+          LOG.info(getContext().getDestinationVertexName() + " disabling final 
merge as "
               + TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED 
+ " is enabled.");
           finalMergeEnabled = false;
           
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT,
 false);
@@ -185,8 +185,8 @@ public class OrderedPartitionedKVOutput extends 
AbstractLogicalOutput {
       this.endTime = System.nanoTime();
       returnEvents = generateEvents();
     } else {
-      LOG.warn(
-          "Attempting to close output {} of type {} before it was started. 
Generating empty events",
+      LOG.warn(getContext().getDestinationVertexName() +
+          ": Attempting to close output {} of type {} before it was started. 
Generating empty events",
           getContext().getDestinationVertexName(), 
this.getClass().getSimpleName());
       returnEvents = generateEmptyEvents();
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
index 08e6ec0..674f2d5 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
@@ -104,7 +104,7 @@ public class UnorderedKVOutput extends 
AbstractLogicalOutput {
       this.kvWriter = new UnorderedPartitionedKVWriter(getContext(), conf, 1,
           memoryUpdateCallbackHandler.getMemoryAssigned());
       isStarted.set(true);
-      LOG.info(this.getClass().getSimpleName() + " started. MemoryAssigned="
+      LOG.info(getContext().getDestinationVertexName() + " started. 
MemoryAssigned="
           + memoryUpdateCallbackHandler.getMemoryAssigned());
     }
   }
@@ -127,8 +127,8 @@ public class UnorderedKVOutput extends 
AbstractLogicalOutput {
       //TODO: Do we need to support sending payloads via events?
       returnEvents = kvWriter.close();
     } else {
-      LOG.warn(
-          "Attempting to close output {} of type {} before it was started. 
Generating empty events",
+      LOG.warn(getContext().getDestinationVertexName() +
+          ": Attempting to close output {} of type {} before it was started. 
Generating empty events",
           getContext().getDestinationVertexName(), 
this.getClass().getSimpleName());
       returnEvents = new LinkedList<Event>();
       ShuffleUtils

http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
index 38450ee..c60923f 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
@@ -104,8 +104,8 @@ public class UnorderedPartitionedKVOutput extends 
AbstractLogicalOutput {
     if (isStarted.get()) {
       returnEvents = kvWriter.close();
     } else {
-      LOG.warn(
-          "Attempting to close output {} of type {} before it was started. 
Generating empty events",
+      LOG.warn(getContext().getDestinationVertexName() +
+          ": Attempting to close output {} of type {} before it was started. 
Generating empty events",
           getContext().getDestinationVertexName(), 
this.getClass().getSimpleName());
       returnEvents = new LinkedList<Event>();
       ShuffleUtils

http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestSimpleFetchedInputAllocator.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestSimpleFetchedInputAllocator.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestSimpleFetchedInputAllocator.java
index ffa9429..2f89b0f 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestSimpleFetchedInputAllocator.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestSimpleFetchedInputAllocator.java
@@ -52,7 +52,7 @@ public class TestSimpleFetchedInputAllocator {
     long inMemThreshold = (long) (bufferPercent * jvmMax);
     LOG.info("InMemThreshold: " + inMemThreshold);
 
-    SimpleFetchedInputAllocator inputManager = new 
SimpleFetchedInputAllocator(UUID.randomUUID().toString(),
+    SimpleFetchedInputAllocator inputManager = new 
SimpleFetchedInputAllocator("srcName", UUID.randomUUID().toString(),
         conf, Runtime.getRuntime().maxMemory(), inMemThreshold);
 
     long requestSize = (long) (0.4f * inMemThreshold);

http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
index eed9fd8..6c000ff 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
@@ -155,7 +155,8 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
         bytesShuffedToDisk,
         bytesShuffedToDiskDirect,
         bytesShuffedToMem,
-        System.currentTimeMillis());
+        System.currentTimeMillis(),
+        "srcNameTrimmed");
     scheduler = spy(realScheduler);
     handler = new ShuffleInputEventHandlerOrderedGrouped(inputContext, 
scheduler, false);
     mergeManager = mock(MergeManager.class);

http://git-wip-us.apache.org/repos/asf/tez/blob/cd80d9ab/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
index bda2fdd..ecc44a7 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
@@ -213,19 +213,19 @@ public class TestDefaultSorter {
   public void testSortMBLimits() throws Exception {
 
     assertTrue("Expected " + DefaultSorter.MAX_IO_SORT_MB,
-        DefaultSorter.computeSortBufferSize(4096) == 
DefaultSorter.MAX_IO_SORT_MB);
+        DefaultSorter.computeSortBufferSize(4096, "") == 
DefaultSorter.MAX_IO_SORT_MB);
     assertTrue("Expected " + DefaultSorter.MAX_IO_SORT_MB,
-        DefaultSorter.computeSortBufferSize(2047) == 
DefaultSorter.MAX_IO_SORT_MB);
-    assertTrue("Expected 1024", DefaultSorter.computeSortBufferSize(1024) == 
1024);
+        DefaultSorter.computeSortBufferSize(2047, "") == 
DefaultSorter.MAX_IO_SORT_MB);
+    assertTrue("Expected 1024", DefaultSorter.computeSortBufferSize(1024, "") 
== 1024);
 
     try {
-      DefaultSorter.computeSortBufferSize(0);
+      DefaultSorter.computeSortBufferSize(0, "");
       fail("Should have thrown error for setting buffer size to 0");
     } catch(RuntimeException re) {
     }
 
     try {
-      DefaultSorter.computeSortBufferSize(-100);
+      DefaultSorter.computeSortBufferSize(-100, "");
       fail("Should have thrown error for setting buffer size to negative 
value");
     } catch(RuntimeException re) {
     }

Reply via email to