Repository: tez
Updated Branches:
  refs/heads/master d777f455b -> f7feaa72b


TEZ-3877. Delete unordered spill files once merge is done (Jason Lowe via 
jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f7feaa72
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f7feaa72
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f7feaa72

Branch: refs/heads/master
Commit: f7feaa72b4fc42676b54e9581165439e9c6d3df7
Parents: d777f45
Author: Jonathan Eagles <[email protected]>
Authored: Tue Jan 16 11:22:10 2018 -0600
Committer: Jonathan Eagles <[email protected]>
Committed: Tue Jan 16 11:22:10 2018 -0600

----------------------------------------------------------------------
 .../writers/UnorderedPartitionedKVWriter.java   | 21 +++++++++++++++++---
 .../TestUnorderedPartitionedKVWriter.java       | 17 ++++++++++++----
 2 files changed, 31 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/f7feaa72/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 6ea0385..f4ebc97 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -111,8 +111,8 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
   WrappedBuffer currentBuffer;
   private final FileSystem rfs;
 
-  private final List<SpillInfo> spillInfoList = Collections
-      .synchronizedList(new ArrayList<SpillInfo>());
+  @VisibleForTesting
+  final List<SpillInfo> spillInfoList = Collections.synchronizedList(new 
ArrayList<SpillInfo>());
 
   private final ListeningExecutorService spillExecutor;
 
@@ -1039,12 +1039,26 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
       if (out != null) {
         out.close();
       }
+      deleteIntermediateSpills();
     }
     finalSpillRecord.writeToFile(finalIndexPath, conf);
     fileOutputBytesCounter.increment(indexFileSizeEstimate);
     LOG.info(destNameTrimmed + ": " + "Finished final spill after merging : " 
+ numSpills.get() + " spills");
   }
 
+  private void deleteIntermediateSpills() {
+    // Delete the intermediate spill files
+    synchronized (spillInfoList) {
+      for (SpillInfo spill : spillInfoList) {
+        try {
+          rfs.delete(spill.outPath, false);
+        } catch (IOException e) {
+          LOG.warn("Unable to delete intermediate spill " + spill.outPath, e);
+        }
+      }
+    }
+  }
+
   private void writeLargeRecord(final Object key, final Object value, final 
int partition)
       throws IOException {
     numAdditionalSpillsCounter.increment(1);
@@ -1359,7 +1373,8 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
     }
   }
 
-  private static class SpillInfo {
+  @VisibleForTesting
+  static class SpillInfo {
     final TezSpillRecord spillRecord;
     final Path outPath;
 

http://git-wip-us.apache.org/repos/asf/tez/blob/f7feaa72/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
index f1cea7e..ae396cb 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
@@ -54,6 +54,7 @@ import com.google.protobuf.ByteString;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.runtime.api.TaskFailureType;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import 
org.apache.tez.runtime.library.common.writers.UnorderedPartitionedKVWriter.SpillInfo;
 import 
org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
 import org.apache.tez.runtime.library.utils.DATA_RANGE_IN_MB;
 import org.roaringbitmap.RoaringBitmap;
@@ -1238,13 +1239,21 @@ public class TestUnorderedPartitionedKVWriter {
     Path outputFilePath = kvWriter.finalOutPath;
     Path spillFilePath = kvWriter.finalIndexPath;
 
-    if (numRecordsWritten > 0) {
-      assertTrue(localFs.exists(outputFilePath));
-      assertTrue(localFs.exists(spillFilePath));
-    } else {
+    if (numRecordsWritten <= 0) {
       return;
     }
 
+    assertTrue(localFs.exists(outputFilePath));
+    assertTrue(localFs.exists(spillFilePath));
+
+    // verify no intermediate spill files have been left around
+    synchronized (kvWriter.spillInfoList) {
+      for (SpillInfo spill : kvWriter.spillInfoList) {
+        assertFalse("lingering intermediate spill file " + spill.outPath,
+            localFs.exists(spill.outPath));
+      }
+    }
+
     // Special case for 0 records.
     TezSpillRecord spillRecord = new TezSpillRecord(spillFilePath, conf);
     DataInputBuffer keyBuffer = new DataInputBuffer();

Reply via email to