Repository: tez Updated Branches: refs/heads/master 088c80ce2 -> 567dc28dd
TEZ-3984: Shuffle: Out of Band DME event sending causes errors (Jaume Marhuenda, reviewed by Gopal V) Signed-off-by: Gopal V <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/567dc28d Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/567dc28d Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/567dc28d Branch: refs/heads/master Commit: 567dc28dd8b2daca6a7f38c4768574bb67df780c Parents: 088c80c Author: Jaume Marhuenda <[email protected]> Authored: Mon Sep 10 12:50:00 2018 -0700 Committer: Gopal V <[email protected]> Committed: Mon Sep 10 12:50:00 2018 -0700 ---------------------------------------------------------------------- .../common/sort/impl/ExternalSorter.java | 6 +++++- .../common/sort/impl/PipelinedSorter.java | 21 ++++++++++++++++---- .../common/sort/impl/dflt/DefaultSorter.java | 4 ++-- .../output/OrderedPartitionedKVOutput.java | 6 +++--- .../common/sort/impl/TestPipelinedSorter.java | 9 +++++++-- 5 files changed, 34 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/567dc28d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java index b6fe457..9e65862 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java @@ -21,11 +21,14 @@ package org.apache.tez.runtime.library.common.sort.impl; import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.util.Collections; import java.util.Iterator; +import java.util.List; import java.util.Map; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; +import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.OutputStatisticsReporter; import org.apache.tez.runtime.library.api.IOInterruptedException; import org.slf4j.Logger; @@ -68,11 +71,12 @@ public abstract class ExternalSorter { private static final Logger LOG = LoggerFactory.getLogger(ExternalSorter.class); - public void close() throws IOException { + public List<Event> close() throws IOException { spillFileIndexPaths.clear(); spillFilePaths.clear(); reportStatistics(); outputContext.notifyProgress(); + return Collections.emptyList(); } public abstract void flush() throws IOException; http://git-wip-us.apache.org/repos/asf/tez/blob/567dc28d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java index 7915662..028dd2f 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java @@ -123,6 +123,11 @@ public class PipelinedSorter extends ExternalSorter { private final Deflater deflater; private final String auxiliaryService; + /** + * Store the events to be send in close. + */ + private final List<Event> finalEvents; + // TODO Set additional countesr - total bytes written, spills etc. public PipelinedSorter(OutputContext outputContext, Configuration conf, int numOutputs, @@ -236,6 +241,7 @@ public class PipelinedSorter extends ExternalSorter { keySerializer.open(span.out); minSpillsForCombine = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINE_MIN_SPILLS, 3); deflater = TezCommonUtils.newBestCompressionDeflater(); + finalEvents = Lists.newLinkedList(); } ByteBuffer allocateSpace() { @@ -695,8 +701,6 @@ public class PipelinedSorter extends ExternalSorter { } if (!isFinalMergeEnabled()) { - //Generate events for all spills - List<Event> events = Lists.newLinkedList(); //For pipelined shuffle, previous events are already sent. Just generate the last event alone int startIndex = (pipelinedShuffle) ? (numSpills - 1) : 0; @@ -705,13 +709,12 @@ public class PipelinedSorter extends ExternalSorter { for (int i = startIndex; i < endIndex; i++) { boolean isLastEvent = (i == numSpills - 1); String pathComponent = (outputContext.getUniqueIdentifier() + "_" + i); - ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), isLastEvent, + ShuffleUtils.generateEventOnSpill(finalEvents, isFinalMergeEnabled(), isLastEvent, outputContext, i, indexCacheList.get(i), partitions, sendEmptyPartitionDetails, pathComponent, partitionStats, reportDetailedPartitionStats(), auxiliaryService, deflater); LOG.info(outputContext.getDestinationVertexName() + ": Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + i); } - outputContext.sendEvents(events); return; } @@ -850,6 +853,16 @@ public class PipelinedSorter extends ExternalSorter { } } + /** + * Close and send events. + * @return events to be returned by the edge. + * @throws IOException parent can throw this. + */ + public final List<Event> close() throws IOException { + super.close(); + return finalEvents; + } + private interface PartitionedRawKeyValueIterator extends TezRawKeyValueIterator { int getPartition(); http://git-wip-us.apache.org/repos/asf/tez/blob/567dc28d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java index 557a538..9b5a43c 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java @@ -753,10 +753,10 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab } @Override - public void close() throws IOException { - super.close(); + public List<Event> close() throws IOException { kvbuffer = null; kvmeta = null; + return super.close(); } boolean isClosed() { http://git-wip-us.apache.org/repos/asf/tez/blob/567dc28d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java index 7d3e0b4..32a4f4d 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java @@ -181,12 +181,12 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput { @Override public synchronized List<Event> close() throws IOException { - List<Event> returnEvents = null; + List<Event> returnEvents = Lists.newLinkedList(); if (sorter != null) { sorter.flush(); - sorter.close(); + returnEvents.addAll(sorter.close()); this.endTime = System.nanoTime(); - returnEvents = generateEvents(); + returnEvents.addAll(generateEvents()); sorter = null; } else { LOG.warn(getContext().getDestinationVertexName() + http://git-wip-us.apache.org/repos/asf/tez/blob/567dc28d/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java index 727f8ac..bd7f585 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java @@ -59,12 +59,14 @@ import org.junit.Test; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.UUID; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyListOf; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doReturn; @@ -402,12 +404,15 @@ public class TestPipelinedSorter { initialAvailableMem); //Write 100 keys each of size 10 - writeData(sorter, 10000, 100); + writeData(sorter, 10000, 100, false); + sorter.flush(); + List<Event> events = sorter.close(); //final merge is disabled. Final output file would not be populated in this case. assertTrue(sorter.finalOutputFile == null); conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true); - verify(outputContext, times(1)).sendEvents(anyListOf(Event.class)); + verify(outputContext, times(0)).sendEvents(any()); + assertTrue(events.size() > 0); } @Test
