Repository: tez
Updated Branches:
  refs/heads/master 3b669f895 -> 00508f898


TEZ-2643. Minimize number of empty spills in Pipelined Sorter (Saikat via 
rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/00508f89
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/00508f89
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/00508f89

Branch: refs/heads/master
Commit: 00508f8989566c1a8723b51c8621c20789b05969
Parents: 3b669f8
Author: Rajesh Balamohan <[email protected]>
Authored: Wed Sep 9 14:55:44 2015 +0530
Committer: Rajesh Balamohan <[email protected]>
Committed: Wed Sep 9 14:55:44 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../common/sort/impl/PipelinedSorter.java       | 52 ++++++++++++++------
 .../common/sort/impl/TestPipelinedSorter.java   | 18 +++++++
 3 files changed, 55 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/00508f89/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bd04fb4..2439370 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.8.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2643. Minimize number of empty spills in Pipelined Sorter
   TEZ-2783. Refactor analyzers to extend TezAnalyzerBase
   TEZ-2784. optimize TaskImpl.isFinished()
   TEZ-2788. Allow TezAnalyzerBase to parse SimpleHistory logs

http://git-wip-us.apache.org/repos/asf/tez/blob/00508f89/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index b35efc7..c4b2b3d 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -215,10 +215,10 @@ public class PipelinedSorter extends ExternalSorter {
       stopWatch.start();
       // sort in the same thread, do not wait for the thread pool
       merger.add(span.sort(sorter));
-      spill();
+      boolean ret = spill(true);
       stopWatch.stop();
       LOG.info("Time taken for spill " + (stopWatch.elapsedMillis()) + " ms");
-      if (pipelinedShuffle) {
+      if (pipelinedShuffle && ret) {
         sendPipelinedShuffleEvents();
       }
       //safe to reset the iterator
@@ -425,28 +425,34 @@ public class PipelinedSorter extends ExternalSorter {
     }
   }
 
-  public void spill() throws IOException {
-    // create spill file
-    final long size = capacity +
-        + (partitions * APPROX_HEADER_LENGTH);
-    final TezSpillRecord spillRec = new TezSpillRecord(partitions);
-    final Path filename =
-      mapOutputFile.getSpillFileForWrite(numSpills, size);
-    spillFilePaths.put(numSpills, filename);
-    FSDataOutputStream out = rfs.create(filename, true, 4096);
-
+  public boolean spill(boolean ignoreEmptySpills) throws IOException {
+    FSDataOutputStream out = null;
     try {
       try {
-        merger.ready(); // wait for all the future results from sort threads
+        boolean ret = merger.ready();
+        // if merger returned false and ignore merge is true,
+        // then return directly without spilling
+        if (!ret && ignoreEmptySpills){
+          return false;
+        }
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         LOG.info("Interrupted while waiting for mergers to complete");
         throw new IOInterruptedException("Interrupted while waiting for 
mergers to complete", e);
       }
+
+      // create spill file
+      final long size = capacity +
+          + (partitions * APPROX_HEADER_LENGTH);
+      final TezSpillRecord spillRec = new TezSpillRecord(partitions);
+      final Path filename =
+        mapOutputFile.getSpillFileForWrite(numSpills, size);
+      spillFilePaths.put(numSpills, filename);
+      out = rfs.create(filename, true, 4096);
       LOG.info("Spilling to " + filename.toString());
       for (int i = 0; i < partitions; ++i) {
         if (isThreadInterrupted()) {
-          return;
+          return false;
         }
         TezRawKeyValueIterator kvIter = merger.filter(i);
         //write merged output to disk
@@ -489,8 +495,11 @@ public class PipelinedSorter extends ExternalSorter {
         //No final merge. Set the number of files offered via shuffle-handler
         numShuffleChunks.setValue(numSpills);
       }
+      return true;
     } finally {
-      out.close();
+      if (out != null) {
+        out.close();
+      }
     }
   }
 
@@ -524,7 +533,15 @@ public class PipelinedSorter extends ExternalSorter {
       LOG.info("Starting flush of map output");
       span.end();
       merger.add(span.sort(sorter));
-      spill();
+      // force a spill in flush()
+      // case 1: we want to force because of following scenarios:
+      // we have no keys written, and flush got called
+      // we want atleast one spill(be it empty)
+      // case 2: in pipeline shuffle case, we have no way of
+      // knowing the last key being written until flush is called
+      // so for flush()->spill() we want to force spill so that
+      // we can send pipeline shuffle event with last event true.
+      spill(false);
       sortmaster.shutdown();
 
       //safe to clean up
@@ -1158,6 +1175,9 @@ public class PipelinedSorter extends ExternalSorter {
         }
 
         StringBuilder sb = new StringBuilder();
+        if (heap.size() == 0) {
+          return false;
+        }
         for(SpanIterator sp: heap) {
             sb.append(sp.toString());
             sb.append(",");

http://git-wip-us.apache.org/repos/asf/tez/blob/00508f89/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
index ebec304..92163c4 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
@@ -147,6 +147,24 @@ public class TestPipelinedSorter {
   }
 
   @Test
+  public void testEmptyDataWithPipelinedShuffle() throws IOException {
+    this.numOutputs = 1;
+    this.initialAvailableMem = 1 *1024 * 1024;
+    
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT,
 false);
+    PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, 
numOutputs,
+        initialAvailableMem, 1 << 20);
+
+    writeData(sorter, 0, 1<<20);
+
+    // final merge is disabled. Final output file would not be populated in 
this case.
+    assertTrue(sorter.finalOutputFile == null);
+    TezCounter numShuffleChunks = 
outputContext.getCounters().findCounter(TaskCounter.SHUFFLE_CHUNK_COUNT);
+    assertTrue(sorter.getNumSpills() == numShuffleChunks.getValue());
+    
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT,
 true);
+
+  }
+
+  @Test
   public void basicTestWithSmallBlockSize() throws IOException {
     //3 MB key & 3 MB value, whereas block size is just 3 MB
     basicTest(1, 5, (3 << 20), (10 * 1024l * 1024l), 3 << 20);

Reply via email to