Repository: tez Updated Branches: refs/heads/master d777f455b -> f7feaa72b
TEZ-3877. Delete unordered spill files once merge is done (Jason Lowe via jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f7feaa72 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f7feaa72 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f7feaa72 Branch: refs/heads/master Commit: f7feaa72b4fc42676b54e9581165439e9c6d3df7 Parents: d777f45 Author: Jonathan Eagles <[email protected]> Authored: Tue Jan 16 11:22:10 2018 -0600 Committer: Jonathan Eagles <[email protected]> Committed: Tue Jan 16 11:22:10 2018 -0600 ---------------------------------------------------------------------- .../writers/UnorderedPartitionedKVWriter.java | 21 +++++++++++++++++--- .../TestUnorderedPartitionedKVWriter.java | 17 ++++++++++++---- 2 files changed, 31 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/f7feaa72/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java index 6ea0385..f4ebc97 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java @@ -111,8 +111,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit WrappedBuffer currentBuffer; private final FileSystem rfs; - private final List<SpillInfo> spillInfoList = Collections - .synchronizedList(new ArrayList<SpillInfo>()); + @VisibleForTesting + final List<SpillInfo> spillInfoList = Collections.synchronizedList(new ArrayList<SpillInfo>()); private final ListeningExecutorService spillExecutor; @@ -1039,12 +1039,26 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit if (out != null) { out.close(); } + deleteIntermediateSpills(); } finalSpillRecord.writeToFile(finalIndexPath, conf); fileOutputBytesCounter.increment(indexFileSizeEstimate); LOG.info(destNameTrimmed + ": " + "Finished final spill after merging : " + numSpills.get() + " spills"); } + private void deleteIntermediateSpills() { + // Delete the intermediate spill files + synchronized (spillInfoList) { + for (SpillInfo spill : spillInfoList) { + try { + rfs.delete(spill.outPath, false); + } catch (IOException e) { + LOG.warn("Unable to delete intermediate spill " + spill.outPath, e); + } + } + } + } + private void writeLargeRecord(final Object key, final Object value, final int partition) throws IOException { numAdditionalSpillsCounter.increment(1); @@ -1359,7 +1373,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit } } - private static class SpillInfo { + @VisibleForTesting + static class SpillInfo { final TezSpillRecord spillRecord; final Path outPath; http://git-wip-us.apache.org/repos/asf/tez/blob/f7feaa72/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java index f1cea7e..ae396cb 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java @@ -54,6 +54,7 @@ import com.google.protobuf.ByteString; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.runtime.api.events.VertexManagerEvent; +import org.apache.tez.runtime.library.common.writers.UnorderedPartitionedKVWriter.SpillInfo; import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto; import org.apache.tez.runtime.library.utils.DATA_RANGE_IN_MB; import org.roaringbitmap.RoaringBitmap; @@ -1238,13 +1239,21 @@ public class TestUnorderedPartitionedKVWriter { Path outputFilePath = kvWriter.finalOutPath; Path spillFilePath = kvWriter.finalIndexPath; - if (numRecordsWritten > 0) { - assertTrue(localFs.exists(outputFilePath)); - assertTrue(localFs.exists(spillFilePath)); - } else { + if (numRecordsWritten <= 0) { return; } + assertTrue(localFs.exists(outputFilePath)); + assertTrue(localFs.exists(spillFilePath)); + + // verify no intermediate spill files have been left around + synchronized (kvWriter.spillInfoList) { + for (SpillInfo spill : kvWriter.spillInfoList) { + assertFalse("lingering intermediate spill file " + spill.outPath, + localFs.exists(spill.outPath)); + } + } + // Special case for 0 records. TezSpillRecord spillRecord = new TezSpillRecord(spillFilePath, conf); DataInputBuffer keyBuffer = new DataInputBuffer();
