Repository: tez
Updated Branches:
  refs/heads/master bbe0f96a7 -> d5ac3b75f


TEZ-3849. Combiner+PipelinedSorter silently drops records (Jacob Tolar via 
kshukla)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d5ac3b75
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d5ac3b75
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d5ac3b75

Branch: refs/heads/master
Commit: d5ac3b75f73b908b292f4f7e1a3d619696d957b8
Parents: bbe0f96
Author: Kuhu Shukla <[email protected]>
Authored: Wed Oct 25 11:01:28 2017 -0500
Committer: Kuhu Shukla <[email protected]>
Committed: Wed Oct 25 11:01:28 2017 -0500

----------------------------------------------------------------------
 .../processor/reduce/ReduceProcessor.java       |  6 +++
 .../tez/mapreduce/combine/TestMRCombiner.java   | 11 ++++-
 .../common/sort/impl/PipelinedSorter.java       | 50 ++++++++++++++++++--
 .../library/common/sort/impl/TezMerger.java     | 36 +++++++++-----
 .../sort/impl/TezRawKeyValueIterator.java       | 11 ++++-
 .../common/sort/impl/dflt/DefaultSorter.java    |  6 +++
 .../input/OrderedGroupedInputLegacy.java        |  5 ++
 .../common/sort/impl/TestPipelinedSorter.java   | 39 +++++++++++++++
 8 files changed, 148 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/d5ac3b75/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
----------------------------------------------------------------------
diff --git 
a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
 
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
index 4b79c78..63b168f 100644
--- 
a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
+++ 
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
@@ -309,6 +309,12 @@ public class ReduceProcessor extends MRTask {
       public DataInputBuffer getValue() throws IOException {
         return rawIter.getValue();
       }
+
+      @Override
+      public boolean hasNext() throws IOException {
+        return rawIter.hasNext();
+      }
+
       public boolean next() throws IOException {
         boolean ret = rawIter.next();
         reporter.setProgress(rawIter.getProgress().getProgress());

http://git-wip-us.apache.org/repos/asf/tez/blob/d5ac3b75/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/combine/TestMRCombiner.java
----------------------------------------------------------------------
diff --git 
a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/combine/TestMRCombiner.java
 
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/combine/TestMRCombiner.java
index a796e59..7668d96 100644
--- 
a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/combine/TestMRCombiner.java
+++ 
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/combine/TestMRCombiner.java
@@ -152,7 +152,16 @@ public class TestMRCombiner {
 
     @Override
     public boolean next() throws IOException {
-      if (i++ < keys.length - 1) {
+      boolean hasNext = hasNext();
+      if (hasNext) {
+        i += 1;
+      }
+
+      return hasNext;
+    }
+
+    public boolean hasNext() throws IOException {
+      if (i < (keys.length -  1)) {
         return true;
       }
       return false;

http://git-wip-us.apache.org/repos/asf/tez/blob/d5ac3b75/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 88d10d0..07c2fe2 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -574,15 +574,14 @@ public class PipelinedSorter extends ExternalSorter {
         //write merged output to disk
         long segmentStart = out.getPos();
         Writer writer = null;
-        boolean hasNext = kvIter.next();
+        boolean hasNext = kvIter.hasNext();
         if (hasNext || !sendEmptyPartitionDetails) {
           writer = new Writer(conf, out, keyClass, valClass, codec,
               spilledRecordsCounter, null, merger.needsRLE());
         }
         if (combiner == null) {
-          while (hasNext) {
+          while (kvIter.next()) {
             writer.append(kvIter.getKey(), kvIter.getValue());
-            hasNext = kvIter.next();
           }
         } else {          
           if (hasNext) {
@@ -842,6 +841,7 @@ public class PipelinedSorter extends ExternalSorter {
 
   private interface PartitionedRawKeyValueIterator extends 
TezRawKeyValueIterator {
     int getPartition();
+    Integer peekPartition();
   }
 
   private static class BufferStreamWrapper extends OutputStream
@@ -1129,6 +1129,11 @@ public class PipelinedSorter extends ExternalSorter {
       return true;
     }
 
+    @Override
+    public boolean hasNext() {
+      return (kvindex == maxindex);
+    }
+
     public void close() {
     }
 
@@ -1146,6 +1151,14 @@ public class PipelinedSorter extends ExternalSorter {
       return partition;
     }
 
+    public Integer peekPartition() {
+      if (!hasNext()) {
+        return null;
+      } else {
+          return kvmeta.get(span.offsetFor(kvindex + 1) + PARTITION);
+      }
+    }
+
     @SuppressWarnings("unused")
     public int size() {
       return (maxindex - kvindex);
@@ -1264,6 +1277,23 @@ public class PipelinedSorter extends ExternalSorter {
       return false;
     }
 
+    @Override
+    public boolean hasNext() throws IOException {
+      if (dirty || iter.hasNext()) {
+        Integer part;
+        if (dirty) {
+          part = iter.getPartition();
+        } else {
+          part = iter.peekPartition();
+        }
+
+        if (part != null) {
+          return (part >>> (32 - partitionBits)) == partition;
+        }
+      }
+      return false;
+    }
+
     public void reset(int partition) {
       this.partition = partition;
     }
@@ -1403,6 +1433,20 @@ public class PipelinedSorter extends ExternalSorter {
       return false;
     }
 
+    @Override
+    public boolean hasNext() {
+      return peek() != null;
+    }
+
+    public Integer peekPartition() {
+      if (!hasNext()) {
+        return null;
+      } else {
+        SpanIterator peek = peek();
+        return peek.getPartition();
+      }
+    }
+
     public DataInputBuffer getKey() { return key; }
     public DataInputBuffer getValue() { return value; }
     public int getPartition() { return partition; }

http://git-wip-us.apache.org/repos/asf/tez/blob/d5ac3b75/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
index 6eb9a40..0e18ead 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
@@ -638,19 +638,10 @@ public class TezMerger {
     }
 
     public boolean next() throws IOException {
-      if (size() == 0)
+      if (!hasNext()) {
         return false;
-
-      if (minSegment != null) {
-        //minSegment is non-null for all invocations of next except the first
-        //one. For the first invocation, the priority queue is ready for use
-        //but for the subsequent invocations, first adjust the queue 
-        adjustPriorityQueue(minSegment);
-        if (size() == 0) {
-          minSegment = null;
-          return false;
-        }
       }
+
       minSegment = top();
       long startPos = minSegment.getPosition();
       KeyValueBuffer nextKey = minSegment.getKey();
@@ -1036,6 +1027,24 @@ public class TezMerger {
       return (hasNext != null) && (hasNext == KeyState.SAME_KEY);
     }
 
+    public boolean hasNext() throws IOException {
+      if (size() == 0)
+        return false;
+
+      if (minSegment != null) {
+        //minSegment is non-null for all invocations of next except the first
+        //one. For the first invocation, the priority queue is ready for use
+        //but for the subsequent invocations, first adjust the queue
+        adjustPriorityQueue(minSegment);
+        if (size() == 0) {
+          minSegment = null;
+          return false;
+        }
+      }
+
+      return true;
+    }
+
   }
 
   private static class EmptyIterator implements TezRawKeyValueIterator {
@@ -1061,6 +1070,11 @@ public class TezMerger {
     }
 
     @Override
+    public boolean hasNext() throws IOException {
+      return false;
+    }
+
+    @Override
     public void close() throws IOException {
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/d5ac3b75/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezRawKeyValueIterator.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezRawKeyValueIterator.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezRawKeyValueIterator.java
index 4e2ce3a..683c9b9 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezRawKeyValueIterator.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezRawKeyValueIterator.java
@@ -55,7 +55,16 @@ public interface TezRawKeyValueIterator {
    * @throws IOException
    */
   boolean next() throws IOException;
-  
+
+  /**
+   * Returns true if any items are left in the iterator.
+   *
+   * @return <code>true</code> if a call to next will succeed
+   *         <code>false</code> otherwise.
+   * @throws IOException
+   */
+  boolean hasNext() throws IOException;
+
   /** 
    * Closes the iterator so that the underlying streams can be closed.
    * 

http://git-wip-us.apache.org/repos/asf/tez/blob/d5ac3b75/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index 268e237..85e0003 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -1122,6 +1122,12 @@ public final class DefaultSorter extends ExternalSorter 
implements IndexedSortab
       this.end = end;
       current = start - 1;
     }
+
+    @Override
+    public boolean hasNext() throws IOException {
+        return (current + 1) < end;
+    }
+
     public boolean next() throws IOException {
       return ++current < end;
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/d5ac3b75/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedInputLegacy.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedInputLegacy.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedInputLegacy.java
index 6ae156a..b697be5 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedInputLegacy.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedInputLegacy.java
@@ -58,6 +58,11 @@ public class OrderedGroupedInputLegacy extends 
OrderedGroupedKVInput {
         }
 
         @Override
+        public boolean hasNext() throws IOException {
+          return false;
+        }
+
+        @Override
         public void close() throws IOException {
         }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/d5ac3b75/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
index f85272b..d6f6273 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
@@ -39,9 +39,11 @@ import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.OutputContext;
 import org.apache.tez.runtime.api.OutputStatisticsReporter;
+import org.apache.tez.runtime.api.TaskContext;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import 
org.apache.tez.runtime.library.api.TezRuntimeConfiguration.ReportPartitionStats;
+import org.apache.tez.runtime.library.common.combine.Combiner;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import 
org.apache.tez.runtime.library.conf.OrderedPartitionedKVOutputConfig.SorterImpl;
 import org.apache.tez.runtime.library.partitioner.HashPartitioner;
@@ -439,6 +441,43 @@ public class TestPipelinedSorter {
   }
 
   @Test
+  public void testWithCombiner() throws IOException {
+    Configuration conf = getConf();
+    
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT,
 true);
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS, 
DummyCombiner.class.getName());
+    this.numOutputs = 5;
+    this.initialAvailableMem = 5 * 1024 * 1024;
+    conf.setInt(TezRuntimeConfiguration
+            .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 3);
+    PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, 
numOutputs,
+            initialAvailableMem);
+
+    writeData(sorter, 1, 20);
+
+    Path outputFile = sorter.finalOutputFile;
+    FileSystem fs = outputFile.getFileSystem(conf);
+    IFile.Reader reader = new IFile.Reader(fs, outputFile, null, null, null, 
false, -1, 4096);
+    verifyData(reader);
+    reader.close();
+
+    verifyCounters(sorter, outputContext);
+  }
+
+  // for testWithCombiner
+  public static class DummyCombiner implements Combiner {
+    public DummyCombiner(TaskContext ctx) {
+      // do nothing
+    }
+
+    @Override
+    public void combine(TezRawKeyValueIterator rawIter, IFile.Writer writer) 
throws InterruptedException, IOException {
+      while (rawIter.next()) {
+        writer.append(rawIter.getKey(), rawIter.getValue());
+      }
+    }
+  }
+
+  @Test
   public void testMultipleSpills_WithRLE() throws IOException {
     Configuration conf = getConf();
     
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT,
 true);

Reply via email to