Repository: tez Updated Branches: refs/heads/master 7ed7025ad -> 983ceeee1
http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java index 18c8302..0e1fe9f 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java @@ -144,7 +144,7 @@ public class Shuffle implements ExceptionReporter { TezCounter mergedMapOutputsCounter = inputContext.getCounters().findCounter(TaskCounter.MERGED_MAP_OUTPUTS); - LOG.info("Shuffle assigned with " + numInputs + " inputs" + ", codec: " + LOG.info(srcNameTrimmed + ": " + "Shuffle assigned with " + numInputs + " inputs" + ", codec: " + (codec == null ? "None" : codec.getClass().getName()) + "ifileReadAhead: " + ifileReadAhead); @@ -190,7 +190,7 @@ public class Shuffle implements ExceptionReporter { sslShuffle); ExecutorService rawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() - .setDaemon(true).setNameFormat("ShuffleAndMergeRunner [" + srcNameTrimmed + "]").build()); + .setDaemon(true).setNameFormat("ShuffleAndMergeRunner {" + srcNameTrimmed + "}").build()); executor = MoreExecutors.listeningDecorator(rawExecutor); @@ -201,7 +201,7 @@ public class Shuffle implements ExceptionReporter { if (!isShutDown.get()) { eventHandler.handleEvents(events); } else { - LOG.info("Ignoring events since already shutdown. EventCount: " + events.size()); + LOG.info(srcNameTrimmed + ": " + "Ignoring events since already shutdown. EventCount: " + events.size()); } } @@ -327,7 +327,7 @@ public class Shuffle implements ExceptionReporter { } inputContext.inputIsReady(); - LOG.info("merge complete for input vertex : " + inputContext.getSourceVertexName()); + LOG.info("merge complete for input vertex : " + srcNameTrimmed); return kvIter; } } @@ -337,7 +337,7 @@ public class Shuffle implements ExceptionReporter { cleanupShuffleScheduler(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - LOG.info("Interrupted while attempting to close the scheduler during cleanup. Ignoring"); + LOG.info(srcNameTrimmed + ": " + "Interrupted while attempting to close the scheduler during cleanup. Ignoring"); } } @@ -355,13 +355,13 @@ public class Shuffle implements ExceptionReporter { if (ignoreErrors) { //Reset the status Thread.currentThread().interrupt(); - LOG.info("Interrupted while attempting to close the merger during cleanup. Ignoring"); + LOG.info(srcNameTrimmed + ": " + "Interrupted while attempting to close the merger during cleanup. Ignoring"); } else { throw e; } } catch (Throwable e) { if (ignoreErrors) { - LOG.info("Exception while trying to shutdown merger, Ignoring", e); + LOG.info(srcNameTrimmed + ": " + "Exception while trying to shutdown merger, Ignoring", e); } else { throw e; } @@ -371,10 +371,13 @@ public class Shuffle implements ExceptionReporter { private void cleanupIgnoreErrors() { try { + if (eventHandler != null) { + eventHandler.logProgress(true); + } cleanupShuffleSchedulerIgnoreErrors(); cleanupMerger(true); } catch (Throwable t) { - LOG.info("Error in cleaning up.., ", t); + LOG.info(srcNameTrimmed + ": " + "Error in cleaning up.., ", t); } } @@ -383,7 +386,7 @@ public class Shuffle implements ExceptionReporter { public synchronized void reportException(Throwable t) { // RunShuffleCallable onFailure deals with ignoring errors on shutdown. if (throwable.get() == null) { - LOG.info("Setting throwable in reportException with message [" + t.getMessage() + + LOG.info(srcNameTrimmed + ": " + "Setting throwable in reportException with message [" + t.getMessage() + "] from thread [" + Thread.currentThread().getName()); throwable.set(t); throwingThreadName = Thread.currentThread().getName(); @@ -409,15 +412,15 @@ public class Shuffle implements ExceptionReporter { private class ShuffleRunnerFutureCallback implements FutureCallback<TezRawKeyValueIterator> { @Override public void onSuccess(TezRawKeyValueIterator result) { - LOG.info("Shuffle Runner thread complete"); + LOG.info(srcNameTrimmed + ": " + "Shuffle Runner thread complete"); } @Override public void onFailure(Throwable t) { if (isShutDown.get()) { - LOG.info("Already shutdown. Ignoring error"); + LOG.info(srcNameTrimmed + ": " + "Already shutdown. Ignoring error"); } else { - LOG.error("ShuffleRunner failed with error", t); + LOG.error(srcNameTrimmed + ": " + "ShuffleRunner failed with error", t); // In case of an abort / Interrupt - the runtime makes sure that this is ignored. inputContext.fatalError(t, "Shuffle Runner Failed"); cleanupIgnoreErrors(); http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java index 9481e65..e0473b3 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java @@ -22,9 +22,11 @@ import java.io.IOException; import java.net.URI; import java.util.BitSet; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; +import org.apache.tez.runtime.library.common.shuffle.ShuffleEventHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.tez.common.TezCommonUtils; @@ -41,7 +43,7 @@ import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovem import com.google.protobuf.InvalidProtocolBufferException; -public class ShuffleInputEventHandlerOrderedGrouped { +public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandler { private static final Logger LOG = LoggerFactory.getLogger(ShuffleInputEventHandlerOrderedGrouped.class); @@ -51,6 +53,11 @@ public class ShuffleInputEventHandlerOrderedGrouped { private int maxMapRuntime = 0; private final boolean sslShuffle; + private final AtomicInteger nextToLogEventCount = new AtomicInteger(0); + private final AtomicInteger numDmeEvents = new AtomicInteger(0); + private final AtomicInteger numObsoletionEvents = new AtomicInteger(0); + private final AtomicInteger numDmeEventsNoData = new AtomicInteger(0); + public ShuffleInputEventHandlerOrderedGrouped(InputContext inputContext, ShuffleScheduler scheduler, boolean sslShuffle) { this.inputContext = inputContext; @@ -58,20 +65,36 @@ public class ShuffleInputEventHandlerOrderedGrouped { this.sslShuffle = sslShuffle; } + @Override public void handleEvents(List<Event> events) throws IOException { for (Event event : events) { handleEvent(event); } } - - + + @Override + public void logProgress(boolean updateOnClose) { + LOG.info(inputContext.getSourceVertexName() + ": " + + "numDmeEventsSeen=" + numDmeEvents.get() + + ", numDmeEventsSeenWithNoData=" + numDmeEventsNoData.get() + + ", numObsoletionEventsSeen=" + numObsoletionEvents.get() + + (updateOnClose == true ? ", updateOnClose" : "")); + } + private void handleEvent(Event event) throws IOException { if (event instanceof DataMovementEvent) { + numDmeEvents.incrementAndGet(); processDataMovementEvent((DataMovementEvent) event); scheduler.updateEventReceivedTime(); } else if (event instanceof InputFailedEvent) { + numObsoletionEvents.incrementAndGet(); processTaskFailedEvent((InputFailedEvent) event); } + if (numDmeEvents.get() + numObsoletionEvents.get() > nextToLogEventCount.get()) { + logProgress(false); + // Log every 50 events seen. + nextToLogEventCount.addAndGet(50); + } } private void processDataMovementEvent(DataMovementEvent dmEvent) throws IOException { @@ -82,8 +105,11 @@ public class ShuffleInputEventHandlerOrderedGrouped { throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e); } int partitionId = dmEvent.getSourceIndex(); - LOG.info("DME srcIdx: " + partitionId + ", targetIdx: " + dmEvent.getTargetIndex() - + ", attemptNum: " + dmEvent.getVersion() + ", payload: " + ShuffleUtils.stringify(shufflePayload)); + if (LOG.isDebugEnabled()) { + LOG.debug("DME srcIdx: " + partitionId + ", targetIdx: " + dmEvent.getTargetIndex() + + ", attemptNum: " + dmEvent.getVersion() + ", payload: " + + ShuffleUtils.stringify(shufflePayload)); + } // TODO NEWTEZ See if this duration hack can be removed. int duration = shufflePayload.getRunDuration(); if (duration > maxMapRuntime) { @@ -101,6 +127,7 @@ public class ShuffleInputEventHandlerOrderedGrouped { "Source partition: " + partitionId + " did not generate any data. SrcAttempt: [" + srcAttemptIdentifier + "]. Not fetching."); } + numDmeEventsNoData.incrementAndGet(); scheduler.copySucceeded(srcAttemptIdentifier, null, 0, 0, 0, null); return; } @@ -120,10 +147,11 @@ public class ShuffleInputEventHandlerOrderedGrouped { private void processTaskFailedEvent(InputFailedEvent ifEvent) { InputAttemptIdentifier taIdentifier = new InputAttemptIdentifier(ifEvent.getTargetIndex(), ifEvent.getVersion()); scheduler.obsoleteInput(taIdentifier); - LOG.info("Obsoleting output of src-task: " + taIdentifier); + if (LOG.isDebugEnabled()) { + LOG.debug("Obsoleting output of src-task: " + taIdentifier); + } } - // TODO NEWTEZ Handle encrypted shuffle @VisibleForTesting URI getBaseURI(String host, int port, int partitionId) { StringBuilder sb = ShuffleUtils.constructBaseURIForShuffleHandler(host, port, http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index 26464bb..f45ca35 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -207,7 +207,6 @@ class ShuffleScheduler { TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES, TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES_DEFAULT); numFetchers = Math.min(configuredNumFetchers, numInputs); - LOG.info("Num fetchers determined to be: " + numFetchers); localDiskFetchEnabled = conf.getBoolean( TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, @@ -260,7 +259,7 @@ class ShuffleScheduler { ExecutorService fetcherRawExecutor = Executors.newFixedThreadPool(numFetchers, new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("Fetcher [" + srcNameTrimmed + "] #%d").build()); + .setNameFormat("Fetcher {" + srcNameTrimmed + "} #%d").build()); this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor); this.maxFailedUniqueFetches = Math.min(numberOfInputs, 5); @@ -293,7 +292,8 @@ class ShuffleScheduler { + ", maxFailedUniqueFetches=" + maxFailedUniqueFetches + ", abortFailureLimit=" + abortFailureLimit + ", maxMapRuntime=" + maxMapRuntime - + ", maxTaskOutputAtOnce=" + maxTaskOutputAtOnce); + + ", maxTaskOutputAtOnce=" + maxTaskOutputAtOnce + + ", numFetchers=" + numFetchers); } public void start() throws Exception { @@ -305,6 +305,8 @@ class ShuffleScheduler { public void close() throws InterruptedException { if (!isShutdown.getAndSet(true)) { + logProgress(); + // Notify and interrupt the waiting scheduler thread synchronized (this) { notifyAll(); @@ -518,19 +520,24 @@ class ShuffleScheduler { @VisibleForTesting void reportExceptionForInput(Exception exception) { - LOG.error("Reporting exception for input", exception); + LOG.error(srcNameTrimmed + ": " + "Reporting exception for input", exception); exceptionReporter.reportException(exception); } + private final AtomicInteger nextProgressLineEventCount = new AtomicInteger(0); + private void logProgress() { - double mbs = (double) totalBytesShuffledTillNow / (1024 * 1024); int inputsDone = numInputs - remainingMaps.get(); - long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000 + 1; - - double transferRate = mbs / secsSinceStart; - LOG.info("copy(" + inputsDone + " (spillsFetched=" + numFetchedSpills + ") of " + numInputs + - ". Transfer rate (CumulativeDataFetched/TimeSinceInputStarted)) " - + mbpsFormat.format(transferRate) + " MB/s)"); + if (inputsDone > nextProgressLineEventCount.get() || inputsDone == numInputs || isShutdown.get()) { + nextProgressLineEventCount.addAndGet(50); + double mbs = (double) totalBytesShuffledTillNow / (1024 * 1024); + long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000 + 1; + + double transferRate = mbs / secsSinceStart; + LOG.info("copy(" + inputsDone + " (spillsFetched=" + numFetchedSpills + ") of " + numInputs + + ". Transfer rate (CumulativeDataFetched/TimeSinceInputStarted)) " + + mbpsFormat.format(transferRate) + " MB/s)"); + } } public synchronized void copyFailed(InputAttemptIdentifier srcAttempt, @@ -585,7 +592,7 @@ class ShuffleScheduler { } public void reportLocalError(IOException ioe) { - LOG.error("Shuffle failed : caused by local error", ioe); + LOG.error(srcNameTrimmed + ": " + "Shuffle failed : caused by local error", ioe); // Shuffle knows how to deal with failures post shutdown via the onFailure hook exceptionReporter.reportException(ioe); } @@ -598,7 +605,7 @@ class ShuffleScheduler { boolean connectError) { if ((reportReadErrorImmediately && (readError || connectError)) || ((failures % maxFetchFailuresBeforeReporting) == 0)) { - LOG.info("Reporting fetch failure for InputIdentifier: " + LOG.info(srcNameTrimmed + ": " + "Reporting fetch failure for InputIdentifier: " + srcAttempt + " taskAttemptIdentifier: " + TezRuntimeUtils.getTaskAttemptIdentifier( inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getInputIndex(), @@ -655,7 +662,8 @@ class ShuffleScheduler { failureCounts.size() == (numInputs - doneMaps)) && !reducerHealthy && (!reducerProgressedEnough || reducerStalled)) { - LOG.error("Shuffle failed with too many fetch failures " + "and insufficient progress!" + LOG.error(srcNameTrimmed + ": " + "Shuffle failed with too many fetch failures " + + "and insufficient progress!" + "failureCounts=" + failureCounts.size() + ", pendingInputs=" + (numInputs - doneMaps) + ", reducerHealthy=" + reducerHealthy + ", reducerProgressedEnough=" + reducerProgressedEnough + ", reducerStalled=" + reducerStalled); @@ -700,7 +708,7 @@ class ShuffleScheduler { public synchronized void obsoleteInput(InputAttemptIdentifier srcAttempt) { // The incoming srcAttempt does not contain a path component. - LOG.info("Adding obsolete input: " + srcAttempt); + LOG.info(srcNameTrimmed + ": " + "Adding obsolete input: " + srcAttempt); if (shuffleInfoEventsMap.containsKey(srcAttempt.getInputIdentifier())) { //Pipelined shuffle case (where shuffleInfoEventsMap gets populated). //Fail fast here. @@ -719,7 +727,9 @@ class ShuffleScheduler { public synchronized MapHost getHost() throws InterruptedException { while (pendingHosts.isEmpty() && remainingMaps.get() > 0) { - LOG.info("PendingHosts=" + pendingHosts); + if (LOG.isDebugEnabled()) { + LOG.debug("PendingHosts=" + pendingHosts); + } wait(); } @@ -735,7 +745,7 @@ class ShuffleScheduler { pendingHosts.remove(host); host.markBusy(); if (LOG.isDebugEnabled()) { - LOG.debug("Assigning " + host + " with " + host.getNumKnownMapOutputs() + + LOG.debug(srcNameTrimmed + ": " + "Assigning " + host + " with " + host.getNumKnownMapOutputs() + " to " + Thread.currentThread().getName()); } shuffleStart.set(System.currentTimeMillis()); @@ -844,8 +854,10 @@ class ShuffleScheduler { notifyAll(); } } - LOG.info(host + " freed by " + Thread.currentThread().getName() + " in " + - (System.currentTimeMillis() - shuffleStart.get()) + "ms"); + if (LOG.isDebugEnabled()) { + LOG.debug(host + " freed by " + Thread.currentThread().getName() + " in " + + (System.currentTimeMillis() - shuffleStart.get()) + "ms"); + } } public synchronized void resetKnownMaps() { @@ -896,8 +908,8 @@ class ShuffleScheduler { */ private class Referee extends Thread { public Referee() { - setName("ShufflePenaltyReferee [" - + TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName()) + "]"); + setName("ShufflePenaltyReferee {" + + TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName()) + "}"); setDaemon(true); } @@ -955,7 +967,7 @@ class ShuffleScheduler { ShuffleScheduler.this.wait(); } catch (InterruptedException e) { if (isShutdown.get()) { - LOG.info( + LOG.info(srcNameTrimmed + ": " + "Interrupted while waiting for fetchers to complete and hasBeenShutdown. Breaking out of ShuffleSchedulerCallable loop"); Thread.currentThread().interrupt(); break; @@ -968,7 +980,7 @@ class ShuffleScheduler { } if (LOG.isDebugEnabled()) { - LOG.debug("NumCompletedInputs: {}" + (numInputs - remainingMaps.get())); + LOG.debug(srcNameTrimmed + ": " + "NumCompletedInputs: {}" + (numInputs - remainingMaps.get())); } // Ensure there's memory available before scheduling the next Fetcher. @@ -979,7 +991,7 @@ class ShuffleScheduler { mergeManager.waitForShuffleToMergeMemory(); } catch (InterruptedException e) { if (isShutdown.get()) { - LOG.info( + LOG.info(srcNameTrimmed + ": " + "Interrupted while waiting for merge to complete and hasBeenShutdown. Breaking out of ShuffleSchedulerCallable loop"); Thread.currentThread().interrupt(); break; @@ -998,7 +1010,7 @@ class ShuffleScheduler { mapHost = getHost(); // Leads to a wait. } catch (InterruptedException e) { if (isShutdown.get()) { - LOG.info( + LOG.info(srcNameTrimmed + ": " + "Interrupted while waiting for host and hasBeenShutdown. Breaking out of ShuffleSchedulerCallable loop"); Thread.currentThread().interrupt(); break; @@ -1010,11 +1022,14 @@ class ShuffleScheduler { break; // Check for the exit condition. } if (LOG.isDebugEnabled()) { - LOG.debug("Processing pending host: " + mapHost.toString()); + LOG.debug(srcNameTrimmed + ": " + "Processing pending host: " + mapHost.toString()); } if (!isShutdown.get()) { count++; - LOG.info("Scheduling fetch for inputHost: {}", mapHost.getIdentifier()); + if (LOG.isDebugEnabled()) { + LOG.debug(srcNameTrimmed + ": " + "Scheduling fetch for inputHost: {}", + mapHost.getIdentifier()); + } FetcherOrderedGrouped fetcherOrderedGrouped = constructFetcherForHost(mapHost); runningFetchers.add(fetcherOrderedGrouped); ListenableFuture<Void> future = fetcherExecutor.submit(fetcherOrderedGrouped); @@ -1063,7 +1078,7 @@ class ShuffleScheduler { public void onSuccess(Void result) { fetcherOrderedGrouped.shutDown(); if (isShutdown.get()) { - LOG.info("Already shutdown. Ignoring fetch complete"); + LOG.info(srcNameTrimmed + ": " + "Already shutdown. Ignoring fetch complete"); } else { doBookKeepingForFetcherComplete(); } @@ -1073,9 +1088,9 @@ class ShuffleScheduler { public void onFailure(Throwable t) { fetcherOrderedGrouped.shutDown(); if (isShutdown.get()) { - LOG.info("Already shutdown. Ignoring fetch complete"); + LOG.info(srcNameTrimmed + ": " + "Already shutdown. Ignoring fetch complete"); } else { - LOG.error("Fetcher failed with error", t); + LOG.error(srcNameTrimmed + ": " + "Fetcher failed with error", t); exceptionReporter.reportException(t); doBookKeepingForFetcherComplete(); } http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java index d27d08d..aa521ea 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java @@ -169,7 +169,10 @@ public abstract class ExternalSorter { rfs = ((LocalFileSystem)FileSystem.getLocal(this.conf)).getRaw(); - LOG.info("Initial Mem : " + initialMemoryAvailable + ", assignedMb=" + ((initialMemoryAvailable >> 20))); + if (LOG.isDebugEnabled()) { + LOG.debug(outputContext.getDestinationVertexName() + ": Initial Mem bytes : " + + initialMemoryAvailable + ", in MB=" + ((initialMemoryAvailable >> 20))); + } int assignedMb = (int) (initialMemoryAvailable >> 20); //Let the overflow checks happen in appropriate sorter impls this.availableMemoryMb = assignedMb; @@ -187,9 +190,13 @@ public abstract class ExternalSorter { serializationFactory = new SerializationFactory(this.conf); keySerializer = serializationFactory.getSerializer(keyClass); valSerializer = serializationFactory.getSerializer(valClass); - LOG.info("keySerializer=" + keySerializer + "; valueSerializer=" + valSerializer - + "; comparator=" + (RawComparator) ConfigUtils.getIntermediateOutputKeyComparator(conf) - + "; conf=" + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY)); + LOG.info(outputContext.getDestinationVertexName() + " using: " + + "memoryMb=" + assignedMb + + ", keySerializerClass=" + keyClass + + ", valueSerializerClass=" + valSerializer + + ", comparator=" + (RawComparator) ConfigUtils.getIntermediateOutputKeyComparator(conf) + + ", partitioner=" + conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS) + + ", serialization=" + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY)); // counters mapOutputByteCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES); @@ -244,8 +251,7 @@ public abstract class ExternalSorter { // Task outputs mapOutputFile = TezRuntimeUtils.instantiateTaskOutputManager(conf, outputContext); - - LOG.info("Instantiating Partitioner: [" + conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS) + "]"); + this.conf.setInt(TezRuntimeFrameworkConfigs.TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS, this.partitions); this.partitioner = TezRuntimeUtils.instantiatePartitioner(this.conf); this.combiner = TezRuntimeUtils.instantiateCombiner(this.conf, outputContext); @@ -339,9 +345,11 @@ public abstract class ExternalSorter { TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + " " + initialMemRequestMb + " should be " + "larger than 0 and should be less than the available task memory (MB):" + (maxAvailableTaskMemory >> 20)); - LOG.info("Requested SortBufferSize (" - + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + "): " - + initialMemRequestMb); + if (LOG.isDebugEnabled()) { + LOG.debug("Requested SortBufferSize (" + + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + "): " + + initialMemRequestMb); + } return reqBytes; } http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java index c4b2b3d..81f5211 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java @@ -1,4 +1,4 @@ -/** + /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -120,7 +120,9 @@ public class PipelinedSorter extends ExternalSorter { PipelinedSorter(OutputContext outputContext, Configuration conf, int numOutputs, long initialMemoryAvailable, int blkSize) throws IOException { super(outputContext, conf, numOutputs, initialMemoryAvailable); - + + StringBuilder initialSetupLogLine = new StringBuilder("Setting up PipelinedSorter for ") + .append(outputContext.getDestinationVertexName()).append(": "); partitionBits = bitcount(partitions)+1; boolean confPipelinedShuffle = this.conf.getBoolean(TezRuntimeConfiguration @@ -140,10 +142,29 @@ public class PipelinedSorter extends ExternalSorter { long usage = sortmb << 20; //Divide total memory into different blocks. int numberOfBlocks = Math.max(1, (int) Math.ceil(1.0 * usage / blockSize)); - LOG.info("Number of Blocks : " + numberOfBlocks - + ", maxMemUsage=" + maxMemUsage + ", BLOCK_SIZE=" + blockSize + ", finalMergeEnabled=" - + isFinalMergeEnabled() + ", pipelinedShuffle=" + pipelinedShuffle + ", " - + "sendEmptyPartitionDetails=" + sendEmptyPartitionDetails); + initialSetupLogLine.append("#blocks=").append(numberOfBlocks); + initialSetupLogLine.append(", maxMemUsage=").append(maxMemUsage); + initialSetupLogLine.append(", BLOCK_SIZE=").append(blockSize); + initialSetupLogLine.append(", finalMergeEnabled=").append(isFinalMergeEnabled()); + initialSetupLogLine.append(", pipelinedShuffle=").append(pipelinedShuffle); + initialSetupLogLine.append(", sendEmptyPartitions=").append(sendEmptyPartitionDetails); + initialSetupLogLine.append(", ").append(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB).append( + "=").append( + sortmb); + + + initialSetupLogLine.append(", UsingHashComparator="); + // k/v serialization + if(comparator instanceof ProxyComparator) { + hasher = (ProxyComparator)comparator; + initialSetupLogLine.append(true); + } else { + hasher = null; + initialSetupLogLine.append(false); + } + + LOG.info(initialSetupLogLine.toString()); + long totalCapacityWithoutMeta = 0; for (int i = 0; i < numberOfBlocks; i++) { Preconditions.checkArgument(usage > 0, "usage can't be less than zero " + usage); @@ -157,7 +178,6 @@ public class PipelinedSorter extends ExternalSorter { listIterator = bufferList.listIterator(); - LOG.info(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + " = " + sortmb); Preconditions.checkArgument(listIterator.hasNext(), "Buffer list seems to be empty " + bufferList.size()); span = new SortSpan(listIterator.next(), 1024*1024, 16, this.comparator); merger = new SpanMerger(); // SpanIterators are comparable @@ -167,17 +187,11 @@ public class PipelinedSorter extends ExternalSorter { TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS_DEFAULT); sortmaster = Executors.newFixedThreadPool(sortThreads, new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("Sorter [" + TezUtilsInternal - .cleanVertexName(outputContext.getDestinationVertexName()) + "] #%d") + .setNameFormat("Sorter {" + TezUtilsInternal + .cleanVertexName(outputContext.getDestinationVertexName()) + "} #%d") .build()); - // k/v serialization - if(comparator instanceof ProxyComparator) { - hasher = (ProxyComparator)comparator; - LOG.info("Using the HashComparator"); - } else { - hasher = null; - } + valSerializer.open(span.out); keySerializer.open(span.out); minSpillsForCombine = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINE_MIN_SPILLS, 3); @@ -217,7 +231,9 @@ public class PipelinedSorter extends ExternalSorter { merger.add(span.sort(sorter)); boolean ret = spill(true); stopWatch.stop(); - LOG.info("Time taken for spill " + (stopWatch.elapsedMillis()) + " ms"); + if (LOG.isDebugEnabled()) { + LOG.debug(outputContext.getDestinationVertexName() + ": Time taken for spill " + (stopWatch.elapsedMillis()) + " ms"); + } if (pipelinedShuffle && ret) { sendPipelinedShuffleEvents(); } @@ -257,7 +273,8 @@ public class PipelinedSorter extends ExternalSorter { (numSpills - 1), indexCacheList.get(numSpills - 1), partitions, sendEmptyPartitionDetails, pathComponent, partitionStats); outputContext.sendEvents(events); - LOG.info("Added spill event for spill (final update=false), spillId=" + (numSpills - 1)); + LOG.info(outputContext.getDestinationVertexName() + + ": Added spill event for spill (final update=false), spillId=" + (numSpills - 1)); } @Override @@ -363,11 +380,15 @@ public class PipelinedSorter extends ExternalSorter { final TezSpillRecord spillRec = new TezSpillRecord(partitions); // getSpillFileForWrite with size -1 as the serialized size of KV pair is still unknown final Path filename = mapOutputFile.getSpillFileForWrite(numSpills, -1); + Path indexFilename = + mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions + * MAP_OUTPUT_INDEX_RECORD_LENGTH); spillFilePaths.put(numSpills, filename); FSDataOutputStream out = rfs.create(filename, true, 4096); try { - LOG.info("Spilling to " + filename.toString()); + LOG.info(outputContext.getDestinationVertexName() + ": Spilling to " + filename.toString() + + ", indexFilename=" + indexFilename); for (int i = 0; i < partitions; ++i) { if (isThreadInterrupted()) { return; @@ -403,10 +424,6 @@ public class PipelinedSorter extends ExternalSorter { } } - Path indexFilename = - mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions - * MAP_OUTPUT_INDEX_RECORD_LENGTH); - LOG.info("Spill Index filename:" + indexFilename); spillFileIndexPaths.put(numSpills, indexFilename); spillRec.writeToFile(indexFilename, conf); //TODO: honor cache limits @@ -437,8 +454,8 @@ public class PipelinedSorter extends ExternalSorter { } } catch (InterruptedException e) { Thread.currentThread().interrupt(); - LOG.info("Interrupted while waiting for mergers to complete"); - throw new IOInterruptedException("Interrupted while waiting for mergers to complete", e); + LOG.info(outputContext.getDestinationVertexName() + ": Interrupted while waiting for mergers to complete"); + throw new IOInterruptedException(outputContext.getDestinationVertexName() + ": Interrupted while waiting for mergers to complete", e); } // create spill file @@ -449,7 +466,7 @@ public class PipelinedSorter extends ExternalSorter { mapOutputFile.getSpillFileForWrite(numSpills, size); spillFilePaths.put(numSpills, filename); out = rfs.create(filename, true, 4096); - LOG.info("Spilling to " + filename.toString()); + LOG.info(outputContext.getDestinationVertexName() + ": Spilling to " + filename.toString()); for (int i = 0; i < partitions; ++i) { if (isThreadInterrupted()) { return false; @@ -509,7 +526,7 @@ public class PipelinedSorter extends ExternalSorter { cleanup(); } sortmaster.shutdownNow(); - LOG.info("Thread interrupted, cleaned up stale data, sorter threads shutdown=" + sortmaster + LOG.info(outputContext.getDestinationVertexName() + ": Thread interrupted, cleaned up stale data, sorter threads shutdown=" + sortmaster .isShutdown() + ", terminated=" + sortmaster.isTerminated()); return true; } @@ -530,7 +547,7 @@ public class PipelinedSorter extends ExternalSorter { } try { - LOG.info("Starting flush of map output"); + LOG.info(outputContext.getDestinationVertexName() + ": Starting flush of map output"); span.end(); merger.add(span.sort(sorter)); // force a spill in flush() @@ -549,7 +566,14 @@ public class PipelinedSorter extends ExternalSorter { if(indexCacheList.isEmpty()) { - LOG.warn("Index list is empty... returning"); + /* + * If we do not have this check, and if the task gets killed in the middle, it can throw + * NPE leading to distraction when debugging. + */ + if (LOG.isDebugEnabled()) { + LOG.debug(outputContext.getDestinationVertexName() + + ": Index list is empty... returning"); + } return; } @@ -567,7 +591,7 @@ public class PipelinedSorter extends ExternalSorter { ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), isLastEvent, outputContext, i, indexCacheList.get(i), partitions, sendEmptyPartitionDetails, pathComponent, partitionStats); - LOG.info("Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + i); + LOG.info(outputContext.getDestinationVertexName() + ": Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + i); } outputContext.sendEvents(events); return; @@ -586,8 +610,9 @@ public class PipelinedSorter extends ExternalSorter { sameVolRename(filename, finalOutputFile); sameVolRename(indexFilename, finalIndexFile); - if (LOG.isInfoEnabled()) { - LOG.info("numSpills=" + numSpills + ", finalOutputFile=" + finalOutputFile + ", " + if (LOG.isDebugEnabled()) { + LOG.debug(outputContext.getDestinationVertexName() + ": numSpills=" + numSpills + + ", finalOutputFile=" + finalOutputFile + ", " + "finalIndexFile=" + finalIndexFile + ", filename=" + filename + ", indexFilename=" + indexFilename); } @@ -608,7 +633,7 @@ public class PipelinedSorter extends ExternalSorter { mapOutputFile.getOutputIndexFileForWrite(0); //TODO if (LOG.isDebugEnabled()) { - LOG.debug( + LOG.debug(outputContext.getDestinationVertexName() + ": " + "numSpills: " + numSpills + ", finalOutputFile:" + finalOutputFile + ", finalIndexFile:" + finalIndexFile); } @@ -773,8 +798,8 @@ public class PipelinedSorter extends ExternalSorter { } ByteBuffer reserved = source.duplicate(); reserved.mark(); - LOG.info("reserved.remaining() = " + reserved.remaining()); - LOG.info("reserved.metasize = "+ metasize); + LOG.info(outputContext.getDestinationVertexName() + ": " + "reserved.remaining()=" + + reserved.remaining() + ", reserved.metasize=" + metasize); reserved.position(metasize); kvbuffer = reserved.slice(); reserved.flip(); @@ -793,7 +818,7 @@ public class PipelinedSorter extends ExternalSorter { if(length() > 1) { sorter.sort(this, 0, length(), nullProgressable); } - LOG.info("done sorting span=" + index + ", length=" + length() + ", " + LOG.info(outputContext.getDestinationVertexName() + ": " + "done sorting span=" + index + ", length=" + length() + ", " + "time=" + (System.currentTimeMillis() - start)); return new SpanIterator(this); } @@ -861,7 +886,7 @@ public class PipelinedSorter extends ExternalSorter { newSpan = new SortSpan(remaining, items, perItem, ConfigUtils.getIntermediateOutputKeyComparator(conf)); newSpan.index = index+1; - LOG.info(String.format("New Span%d.length = %d, perItem = %d", newSpan.index, newSpan + LOG.info(String.format(outputContext.getDestinationVertexName() + ": " + "New Span%d.length = %d, perItem = %d", newSpan.index, newSpan .length(), perItem) + ", counter:" + mapOutputRecordCounter.getValue()); return newSpan; } @@ -883,11 +908,11 @@ public class PipelinedSorter extends ExternalSorter { return null; } int perItem = kvbuffer.position()/items; - LOG.info(String.format("Span%d.length = %d, perItem = %d", index, length(), perItem)); + LOG.info(outputContext.getDestinationVertexName() + ": " + String.format("Span%d.length = %d, perItem = %d", index, length(), perItem)); if(remaining.remaining() < METASIZE+perItem) { //Check if we can get the next Buffer from the main buffer list if (listIterator.hasNext()) { - LOG.info("Getting memory from next block in the list, recordsWritten=" + + LOG.info(outputContext.getDestinationVertexName() + ": " + "Getting memory from next block in the list, recordsWritten=" + mapOutputRecordCounter.getValue()); reinit = true; return listIterator.next(); @@ -1184,10 +1209,10 @@ public class PipelinedSorter extends ExternalSorter { total += sp.span.length(); eq += sp.span.getEq(); } - LOG.info("Heap = " + sb.toString()); + LOG.info(outputContext.getDestinationVertexName() + ": " + "Heap = " + sb.toString()); return true; } catch(ExecutionException e) { - LOG.info(e.toString()); + LOG.info(outputContext.getDestinationVertexName() + ": " + e.toString()); return false; } } http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java index edc02f3..727f9a2 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java @@ -130,7 +130,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { final float spillper = this.conf.getFloat( TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT, TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT_DEFAULT); - final int sortmb = computeSortBufferSize((int) availableMemoryMb); + final int sortmb = computeSortBufferSize((int) availableMemoryMb, outputContext.getDestinationVertexName()); Preconditions.checkArgument(spillper <= (float) 1.0 && spillper > (float) 0.0, TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT @@ -144,7 +144,8 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED_DEFAULT); if (confPipelinedShuffle) { - LOG.warn(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED + " does not work " + LOG.warn(outputContext.getDestinationVertexName() + ": " + + TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED + " does not work " + "with DefaultSorter. It is supported only with PipelinedSorter."); } @@ -164,10 +165,14 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { softLimit = (int)(kvbuffer.length * spillper); bufferRemaining = softLimit; if (LOG.isInfoEnabled()) { - LOG.info(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + ": " + sortmb); - LOG.info("soft limit at " + softLimit); - LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid); - LOG.info("kvstart = " + kvstart + "; length = " + maxRec + "; finalMergeEnabled = " + isFinalMergeEnabled()); + LOG.info(outputContext.getDestinationVertexName() + ": " + + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + "=" + sortmb + + ", soft limit=" + softLimit + + ", bufstart=" + bufstart + + ", bufvoid=" + bufvoid + + ", kvstart=" + kvstart + + ", legnth=" + maxRec + + ", finalMergeEnabled=" + isFinalMergeEnabled()); } // k/v serialization @@ -177,8 +182,8 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { spillInProgress = false; minSpillsForCombine = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINE_MIN_SPILLS, 3); spillThread.setDaemon(true); - spillThread.setName("SpillThread [" - + TezUtilsInternal.cleanVertexName(outputContext.getDestinationVertexName() + "]")); + spillThread.setName("SpillThread {" + + TezUtilsInternal.cleanVertexName(outputContext.getDestinationVertexName() + "}")); spillLock.lock(); try { spillThread.start(); @@ -200,7 +205,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { } @VisibleForTesting - static int computeSortBufferSize(int availableMemoryMB) { + static int computeSortBufferSize(int availableMemoryMB, String logContext) { if (availableMemoryMB <= 0) { throw new RuntimeException(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + @@ -208,7 +213,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { } if (availableMemoryMB > MAX_IO_SORT_MB) { - LOG.warn("Scaling down " + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + + LOG.warn(logContext + ": Scaling down " + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + "=" + availableMemoryMB + " to " + MAX_IO_SORT_MB + " (max sort buffer size supported forDefaultSorter)"); } @@ -354,7 +359,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { kvindex = (int)(((long)kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity()); totalKeys++; } catch (MapBufferTooSmallException e) { - LOG.info("Record too large for in-memory buffer: " + e.getMessage()); + LOG.info(outputContext.getDestinationVertexName() + ": Record too large for in-memory buffer: " + e.getMessage()); spillSingleRecord(key, value, partition); mapOutputRecordCounter.increment(1); return; @@ -373,7 +378,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { // Cast one of the operands to long to avoid integer overflow kvindex = (int) (((long) aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4; if (LOG.isInfoEnabled()) { - LOG.info("(EQUATOR) " + pos + " kvi " + kvindex + + LOG.info(outputContext.getDestinationVertexName() + ": " + "(EQUATOR) " + pos + " kvi " + kvindex + "(" + (kvindex * 4) + ")"); } } @@ -391,7 +396,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { // Cast one of the operands to long to avoid integer overflow kvstart = kvend = (int) (((long) aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4; if (LOG.isInfoEnabled()) { - LOG.info("(RESET) equator " + e + " kv " + kvstart + "(" + + LOG.info(outputContext.getDestinationVertexName() + ": " + "(RESET) equator " + e + " kv " + kvstart + "(" + (kvstart * 4) + ")" + " kvi " + kvindex + "(" + (kvindex * 4) + ")"); } } @@ -647,7 +652,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { spillThread.interrupt(); spillThread.join(); } catch (InterruptedException e) { - LOG.info("Spill thread interrupted"); + LOG.info(outputContext.getDestinationVertexName() + ": " + "Spill thread interrupted"); //Reset status Thread.currentThread().interrupt(); throw new IOInterruptedException("Spill failed", e); @@ -656,7 +661,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { @Override public void flush() throws IOException { - LOG.info("Starting flush of map output"); + LOG.info(outputContext.getDestinationVertexName() + ": " + "Starting flush of map output"); if (Thread.currentThread().isInterrupted()) { /** * Possible that the thread got interrupted when flush was happening or when the flush was @@ -691,13 +696,13 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { kvend = (kvindex + NMETA) % kvmeta.capacity(); bufend = bufmark; if (LOG.isInfoEnabled()) { - LOG.info("Sorting & Spilling map output"); - LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark + - "; bufvoid = " + bufvoid); - LOG.info("kvstart = " + kvstart + "(" + (kvstart * 4) + - "); kvend = " + kvend + "(" + (kvend * 4) + - "); length = " + (distanceTo(kvend, kvstart, - kvmeta.capacity()) + 1) + "/" + maxRec); + LOG.info( + outputContext.getDestinationVertexName() + ": " + "Sorting & Spilling map output. " + + "bufstart = " + bufstart + ", bufend = " + bufmark + ", bufvoid = " + bufvoid + + "; " + "kvstart=" + kvstart + "(" + (kvstart * 4) + ")" + + ", kvend = " + kvend + "(" + (kvend * 4) + ")" + + ", length = " + (distanceTo(kvend, kvstart, kvmeta.capacity()) + 1) + "/" + + maxRec); } sortAndSpill(); } @@ -743,7 +748,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { spillLock.unlock(); sortAndSpill(); } catch (Throwable t) { - LOG.warn("Got an exception in sortAndSpill", t); + LOG.warn(outputContext.getDestinationVertexName() + ": " + "Got an exception in sortAndSpill", t); sortSpillException = t; } finally { spillLock.lock(); @@ -756,7 +761,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { } } } catch (InterruptedException e) { - LOG.info("Spill thread interrupted"); + LOG.info(outputContext.getDestinationVertexName() + ": " + "Spill thread interrupted"); Thread.currentThread().interrupt(); } finally { spillLock.unlock(); @@ -787,13 +792,11 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { bufend = bufmark; spillInProgress = true; if (LOG.isInfoEnabled()) { - LOG.info("Spilling map output"); - LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark + - "; bufvoid = " + bufvoid); - LOG.info("kvstart = " + kvstart + "(" + (kvstart * 4) + - "); kvend = " + kvend + "(" + (kvend * 4) + - "); length = " + (distanceTo(kvend, kvstart, - kvmeta.capacity()) + 1) + "/" + maxRec); + LOG.info(outputContext.getDestinationVertexName() + ": Spilling map output." + + "bufstart=" + bufstart + ", bufend = " + bufmark + ", bufvoid = " + bufvoid + +"; kvstart=" + kvstart + "(" + (kvstart * 4) + ")" + +", kvend = " + kvend + "(" + (kvend * 4) + ")" + + ", length = " + (distanceTo(kvend, kvstart, kvmeta.capacity()) + 1) + "/" + maxRec); } spillReady.signal(); } @@ -889,7 +892,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { TezRawKeyValueIterator kvIter = new MRResultIterator(spstart, spindex); if (LOG.isDebugEnabled()) { - LOG.debug("Running combine processor"); + LOG.debug(outputContext.getDestinationVertexName() + ": " + "Running combine processor"); } runCombineProcessor(kvIter, writer); } @@ -927,7 +930,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { totalIndexCacheMemory += spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH; } - LOG.info("Finished spill " + numSpills); + LOG.info(outputContext.getDestinationVertexName() + ": " + "Finished spill " + numSpills); ++numSpills; if (!isFinalMergeEnabled()) { numShuffleChunks.setValue(numSpills); @@ -1113,7 +1116,8 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { outputContext, index, spillRecord, partitions, sendEmptyPartitionDetails, pathComponent, partitionStats); - LOG.info("Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + index); + LOG.info(outputContext.getDestinationVertexName() + ": " + + "Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + index); if (sendEvent) { outputContext.sendEvents(events); @@ -1271,7 +1275,8 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable { segmentList.add(i, s); if (LOG.isDebugEnabled()) { - LOG.debug("TaskIdentifier=" + taskIdentifier + " Partition=" + parts + + LOG.debug(outputContext.getDestinationVertexName() + ": " + + "TaskIdentifier=" + taskIdentifier + " Partition=" + parts + "Spill =" + i + "(" + indexRecord.getStartOffset() + "," + indexRecord.getRawLength() + ", " + indexRecord.getPartLength() + ")"); http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java index 9a98cd1..70b345f 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java @@ -92,6 +92,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit // Maybe setup a separate statistics class which can be shared between the // buffer and the main path instead of having multiple arrays. + private final String destNameTrimmed; private final long availableMemory; @VisibleForTesting final WrappedBuffer[] buffers; @@ -150,6 +151,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit super(outputContext, conf, numOutputs); Preconditions.checkArgument(availableMemoryBytes >= 0, "availableMemory should be >= 0 bytes"); + this.destNameTrimmed = TezUtilsInternal.cleanVertexName(outputContext.getDestinationVertexName()); //Not checking for TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT as it might not add much value in // this case. Add it later if needed. pipelinedShuffle = this.conf.getBoolean(TezRuntimeConfiguration @@ -171,13 +173,16 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit int maxSingleBufferSizeBytes = conf.getInt( TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES, Integer.MAX_VALUE); computeNumBuffersAndSize(maxSingleBufferSizeBytes); - LOG.info("Running with numBuffers=" + numBuffers + ", sizePerBuffer=" + sizePerBuffer); + availableBuffers = new LinkedBlockingQueue<WrappedBuffer>(); buffers = new WrappedBuffer[numBuffers]; // Set up only the first buffer to start with. buffers[0] = new WrappedBuffer(numOutputs, sizePerBuffer); numInitializedBuffers = 1; - LOG.info("Initialize Buffer #" + numInitializedBuffers + " with size=" + sizePerBuffer); + if (LOG.isDebugEnabled()) { + LOG.debug(destNameTrimmed + ": " + "Initializing Buffer #" + + numInitializedBuffers + " with size=" + sizePerBuffer); + } currentBuffer = buffers[0]; baos = new ByteArrayOutputStream(); dos = new DataOutputStream(baos); @@ -190,8 +195,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit new ThreadFactoryBuilder() .setDaemon(true) .setNameFormat( - "UnorderedOutSpiller [" - + TezUtilsInternal.cleanVertexName(outputContext.getDestinationVertexName()) + "]") + "UnorderedOutSpiller {" + + TezUtilsInternal.cleanVertexName(outputContext.getDestinationVertexName()) + "}") .build()); spillExecutor = MoreExecutors.listeningDecorator(executor); numRecordsPerPartition = new int[numPartitions]; @@ -214,7 +219,12 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit skipBuffers = false; writer = null; } - LOG.info("pipelinedShuffle=" + pipelinedShuffle + ", skipBuffers=" + skipBuffers); + LOG.info(destNameTrimmed + ": " + + "numBuffers=" + numBuffers + + ", sizePerBuffer=" + sizePerBuffer + + ", skipBuffers=" + skipBuffers + + ", pipelinedShuffle=" + pipelinedShuffle + + ", numPartitions=" + numPartitions); } private void computeNumBuffersAndSize(int bufferLimit) { @@ -321,7 +331,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit currentBuffer.reset(); } else { // Update overall stats - LOG.info("Moving to next buffer and triggering spill"); + LOG.info(destNameTrimmed + ": " + "Moving to next buffer and triggering spill"); updateGlobalStats(currentBuffer); pendingSpillCount.incrementAndGet(); @@ -420,10 +430,10 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit spillResult = new SpillResult(compressedLength, this.wrappedBuffer); handleSpillIndex(spillPathDetails, spillRecord); - LOG.info("Finished spill " + spillIndex); + LOG.info(destNameTrimmed + ": " + "Finished spill " + spillIndex); if (LOG.isDebugEnabled()) { - LOG.debug("Spill=" + spillIndex + ", indexPath=" + LOG.debug(destNameTrimmed + ": " + "Spill=" + spillIndex + ", indexPath=" + spillPathDetails.indexFilePath + ", outputPath=" + spillPathDetails.outputFilePath); } return spillResult; @@ -461,7 +471,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit isShutdown.set(true); spillLock.lock(); try { - LOG.info("Waiting for all spills to complete : Pending : " + pendingSpillCount.get()); + LOG.info(destNameTrimmed + ": " + "Waiting for all spills to complete : Pending : " + pendingSpillCount.get()); while (pendingSpillCount.get() != 0 && spillException == null) { spillInProgress.await(); } @@ -469,7 +479,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit spillLock.unlock(); } if (spillException != null) { - LOG.error("Error during spill, throwing"); + LOG.error(destNameTrimmed + ": " + "Error during spill, throwing"); // Assuming close will be called on the same thread as the write cleanup(); currentBuffer.cleanup(); @@ -480,7 +490,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit throw new IOException(spillException); } } else { - LOG.info("All spills complete"); + LOG.info(destNameTrimmed + ": " + "All spills complete"); // Assuming close will be called on the same thread as the write cleanup(); @@ -687,7 +697,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit for (int i = 0; i < numPartitions; i++) { long segmentStart = out.getPos(); if (numRecordsPerPartition[i] == 0) { - LOG.info("Skipping partition: " + i + " in final merge since it has no records"); + LOG.info(destNameTrimmed + ": " + "Skipping partition: " + i + " in final merge since it has no records"); continue; } writer = new Writer(conf, out, keyClass, valClass, codec, null, null); @@ -739,7 +749,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit } finalSpillRecord.writeToFile(finalIndexPath, conf); fileOutputBytesCounter.increment(indexFileSizeEstimate); - LOG.info("Finished final spill after merging : " + numSpills.get() + " spills"); + LOG.info(destNameTrimmed + ": " + "Finished final spill after merging : " + numSpills.get() + " spills"); } private void writeLargeRecord(final Object key, final Object value, final int partition) @@ -791,9 +801,9 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit sendPipelinedEventForSpill(emptyPartitions, spillIndex, false); - LOG.info("Finished writing large record of size " + outSize + " to spill file " + spillIndex); + LOG.info(destNameTrimmed + ": " + "Finished writing large record of size " + outSize + " to spill file " + spillIndex); if (LOG.isDebugEnabled()) { - LOG.debug("LargeRecord Spill=" + spillIndex + ", indexPath=" + LOG.debug(destNameTrimmed + ": " + "LargeRecord Spill=" + spillIndex + ", indexPath=" + spillPathDetails.indexFilePath + ", outputPath=" + spillPathDetails.outputFilePath); } @@ -902,10 +912,10 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit Event compEvent = generateDMEvent(true, spillNumber, isFinalUpdate, pathComponent, emptyPartitions); - LOG.info("Adding spill event for spill (final update=" + isFinalUpdate + "), spillId=" + spillNumber); + LOG.info(destNameTrimmed + ": " + "Adding spill event for spill (final update=" + isFinalUpdate + "), spillId=" + spillNumber); outputContext.sendEvents(Collections.singletonList(compEvent)); } catch (IOException e) { - LOG.error("Error in sending pipelined events", e); + LOG.error(destNameTrimmed + ": " + "Error in sending pipelined events", e); outputContext.fatalError(e, "Error in sending pipelined events"); } } @@ -925,7 +935,6 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit @Override public void onSuccess(SpillResult result) { - LOG.info("Spill# " + spillNumber + " complete."); spilledSize += result.spillSize; sendPipelinedEventForSpill(result.wrappedBuffer.recordsPerPartition, spillNumber, false); @@ -935,7 +944,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit availableBuffers.add(result.wrappedBuffer); } catch (Throwable e) { - LOG.error("Failure while attempting to reset buffer after spill", e); + LOG.error(destNameTrimmed + ": " + "Failure while attempting to reset buffer after spill", e); outputContext.fatalError(e, "Failure while attempting to reset buffer after spill"); } @@ -960,7 +969,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit public void onFailure(Throwable t) { // spillException setup to throw an exception back to the user. Requires synchronization. // Consider removing it in favor of having Tez kill the task - LOG.error("Failure while spilling to disk", t); + LOG.error(destNameTrimmed + ": " + "Failure while spilling to disk", t); spillException = t; outputContext.fatalError(t, "Failure while spilling to disk"); spillLock.lock(); http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java index 7d887de..880dc2f 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java @@ -132,8 +132,10 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput { List<Event> pending = new LinkedList<Event>(); pendingEvents.drainTo(pending); if (pending.size() > 0) { - LOG.info("NoAutoStart delay in processing first event: " - + (System.currentTimeMillis() - firstEventReceivedTime)); + if (LOG.isDebugEnabled()) { + LOG.debug("NoAutoStart delay in processing first event: " + + (System.currentTimeMillis() - firstEventReceivedTime)); + } shuffle.handleEvents(pending); } isStarted.set(true); @@ -283,10 +285,16 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput { protected synchronized void createValuesIterator() throws IOException { // Not used by ReduceProcessor - vIter = new ValuesIterator(rawIter, - (RawComparator) ConfigUtils.getIntermediateInputKeyComparator(conf), - ConfigUtils.getIntermediateInputKeyClass(conf), - ConfigUtils.getIntermediateInputValueClass(conf), conf, inputKeyCounter, inputValueCounter); + RawComparator rawComparator = ConfigUtils.getIntermediateInputKeyComparator(conf); + Class<?> keyClass = ConfigUtils.getIntermediateInputKeyClass(conf); + Class<?> valClass = ConfigUtils.getIntermediateInputValueClass(conf); + LOG.info(getContext().getSourceVertexName() + ": " + "creating ValuesIterator with " + + "comparator=" + rawComparator.getClass().getName() + + ", keyClass=" + keyClass.getName() + + ", valClass=" + valClass.getName()); + + vIter = new ValuesIterator(rawIter, rawComparator, keyClass, valClass, + conf, inputKeyCounter, inputValueCounter); } http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java index 271eed3..fad164f 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java @@ -24,6 +24,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.runtime.library.common.Constants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -132,7 +133,8 @@ public class UnorderedKVInput extends AbstractLogicalInput { ifileBufferSize = conf.getInt("io.file.buffer.size", TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT); - this.inputManager = new SimpleFetchedInputAllocator(getContext().getUniqueIdentifier(), conf, + this.inputManager = new SimpleFetchedInputAllocator( + TezUtilsInternal.cleanVertexName(getContext().getSourceVertexName()), getContext().getUniqueIdentifier(), conf, getContext().getTotalMemoryAvailableToTask(), memoryUpdateCallbackHandler.getMemoryAssigned()); @@ -150,8 +152,10 @@ public class UnorderedKVInput extends AbstractLogicalInput { List<Event> pending = new LinkedList<Event>(); pendingEvents.drainTo(pending); if (pending.size() > 0) { - LOG.info("NoAutoStart delay in processing first event: " - + (System.currentTimeMillis() - firstEventReceivedTime)); + if (LOG.isDebugEnabled()) { + LOG.debug(getContext().getSourceVertexName() + ": " + "NoAutoStart delay in processing first event: " + + (System.currentTimeMillis() - firstEventReceivedTime)); + } inputEventHandler.handleEvents(pending); } isStarted.set(true); @@ -208,6 +212,10 @@ public class UnorderedKVInput extends AbstractLogicalInput { @Override public synchronized List<Event> close() throws Exception { + if (this.inputEventHandler != null) { + this.inputEventHandler.logProgress(true); + } + if (this.shuffleManager != null) { this.shuffleManager.shutdown(); } @@ -218,7 +226,7 @@ public class UnorderedKVInput extends AbstractLogicalInput { long inputRecords = getContext().getCounters() .findCounter(TaskCounter.INPUT_RECORDS_PROCESSED).getValue(); getContext().getStatisticsReporter().reportItemsProcessed(inputRecords); - + return null; } http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java index 98d75a9..45b6713 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java @@ -126,7 +126,7 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput { if (pipelinedShuffle) { if (finalMergeEnabled) { - LOG.info("Disabling final merge as " + LOG.info(getContext().getDestinationVertexName() + " disabling final merge as " + TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED + " is enabled."); finalMergeEnabled = false; conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false); @@ -184,8 +184,8 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput { this.endTime = System.nanoTime(); returnEvents = generateEvents(); } else { - LOG.warn( - "Attempting to close output {} of type {} before it was started. Generating empty events", + LOG.warn(getContext().getDestinationVertexName() + + ": Attempting to close output {} of type {} before it was started. Generating empty events", getContext().getDestinationVertexName(), this.getClass().getSimpleName()); returnEvents = generateEmptyEvents(); } http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java index b50f17d..879c2e0 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java @@ -104,7 +104,7 @@ public class UnorderedKVOutput extends AbstractLogicalOutput { this.kvWriter = new UnorderedPartitionedKVWriter(getContext(), conf, 1, memoryUpdateCallbackHandler.getMemoryAssigned()); isStarted.set(true); - LOG.info(this.getClass().getSimpleName() + " started. MemoryAssigned=" + LOG.info(getContext().getDestinationVertexName() + " started. MemoryAssigned=" + memoryUpdateCallbackHandler.getMemoryAssigned()); } } @@ -127,8 +127,8 @@ public class UnorderedKVOutput extends AbstractLogicalOutput { //TODO: Do we need to support sending payloads via events? returnEvents = kvWriter.close(); } else { - LOG.warn( - "Attempting to close output {} of type {} before it was started. Generating empty events", + LOG.warn(getContext().getDestinationVertexName() + + ": Attempting to close output {} of type {} before it was started. Generating empty events", getContext().getDestinationVertexName(), this.getClass().getSimpleName()); returnEvents = new LinkedList<Event>(); ShuffleUtils http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java index 7498627..90c0ed4 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java @@ -104,8 +104,8 @@ public class UnorderedPartitionedKVOutput extends AbstractLogicalOutput { if (isStarted.get()) { returnEvents = kvWriter.close(); } else { - LOG.warn( - "Attempting to close output {} of type {} before it was started. Generating empty events", + LOG.warn(getContext().getDestinationVertexName() + + ": Attempting to close output {} of type {} before it was started. Generating empty events", getContext().getDestinationVertexName(), this.getClass().getSimpleName()); returnEvents = new LinkedList<Event>(); ShuffleUtils http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestSimpleFetchedInputAllocator.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestSimpleFetchedInputAllocator.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestSimpleFetchedInputAllocator.java index ffa9429..2f89b0f 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestSimpleFetchedInputAllocator.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestSimpleFetchedInputAllocator.java @@ -52,7 +52,7 @@ public class TestSimpleFetchedInputAllocator { long inMemThreshold = (long) (bufferPercent * jvmMax); LOG.info("InMemThreshold: " + inMemThreshold); - SimpleFetchedInputAllocator inputManager = new SimpleFetchedInputAllocator(UUID.randomUUID().toString(), + SimpleFetchedInputAllocator inputManager = new SimpleFetchedInputAllocator("srcName", UUID.randomUUID().toString(), conf, Runtime.getRuntime().maxMemory(), inMemThreshold); long requestSize = (long) (0.4f * inMemThreshold); http://git-wip-us.apache.org/repos/asf/tez/blob/983ceeee/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java index c22e605..b531464 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java @@ -214,19 +214,19 @@ public class TestDefaultSorter { public void testSortMBLimits() throws Exception { assertTrue("Expected " + DefaultSorter.MAX_IO_SORT_MB, - DefaultSorter.computeSortBufferSize(4096) == DefaultSorter.MAX_IO_SORT_MB); + DefaultSorter.computeSortBufferSize(4096, "") == DefaultSorter.MAX_IO_SORT_MB); assertTrue("Expected " + DefaultSorter.MAX_IO_SORT_MB, - DefaultSorter.computeSortBufferSize(2047) == DefaultSorter.MAX_IO_SORT_MB); - assertTrue("Expected 1024", DefaultSorter.computeSortBufferSize(1024) == 1024); + DefaultSorter.computeSortBufferSize(2047, "") == DefaultSorter.MAX_IO_SORT_MB); + assertTrue("Expected 1024", DefaultSorter.computeSortBufferSize(1024, "") == 1024); try { - DefaultSorter.computeSortBufferSize(0); + DefaultSorter.computeSortBufferSize(0, ""); fail("Should have thrown error for setting buffer size to 0"); } catch(RuntimeException re) { } try { - DefaultSorter.computeSortBufferSize(-100); + DefaultSorter.computeSortBufferSize(-100, ""); fail("Should have thrown error for setting buffer size to negative value"); } catch(RuntimeException re) { }
