Repository: tez
Updated Branches:
  refs/heads/branch-0.7 81b716626 -> 714d8c432


TEZ-3202. Reduce the memory need for jobs with high number of segments (jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/714d8c43
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/714d8c43
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/714d8c43

Branch: refs/heads/branch-0.7
Commit: 714d8c4328f5185d9646a820c1271e6139c5b9f4
Parents: 81b7166
Author: Jonathan Eagles <[email protected]>
Authored: Thu Apr 14 15:30:29 2016 -0500
Committer: Jonathan Eagles <[email protected]>
Committed: Thu Apr 14 15:30:29 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../shuffle/orderedgrouped/MergeManager.java    |  13 +-
 .../common/sort/impl/PipelinedSorter.java       |   5 +-
 .../library/common/sort/impl/TezMerger.java     | 245 ++++++++++++-------
 .../common/sort/impl/dflt/DefaultSorter.java    |   5 +-
 .../library/common/TestValuesIterator.java      |   2 +-
 .../library/common/sort/impl/TestTezMerger.java |   4 +-
 7 files changed, 170 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/714d8c43/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 38bc67e..4033249 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
   TEZ-2972. Avoid task rescheduling when a node turns unhealthy
 
 ALL CHANGES:
+  TEZ-3202. Reduce the memory need for jobs with high number of segments
   TEZ-3188. Move tez.submit.hosts out of TezConfiguration to 
TezConfigurationConstants.
   TEZ-3196. java.lang.InternalError from decompression codec is fatal to a 
task during shuffle
   TEZ-3177. Non-DAG events should use the session domain or no domain if the 
data does not need protection.

http://git-wip-us.apache.org/repos/asf/tez/blob/714d8c43/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
index 2d519d2..da7bcd8 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
@@ -61,6 +61,7 @@ import org.apache.tez.runtime.library.common.combine.Combiner;
 import org.apache.tez.runtime.library.common.sort.impl.IFile;
 import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
 import org.apache.tez.runtime.library.common.sort.impl.TezMerger;
+import org.apache.tez.runtime.library.common.sort.impl.TezMerger.DiskSegment;
 import org.apache.tez.runtime.library.common.sort.impl.TezMerger.Segment;
 import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
 import 
org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
@@ -442,7 +443,7 @@ public class MergeManager {
     commitMemory -= size;
     usedMemory -= size;
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Notifying unreserve : commitMemory=" + commitMemory + ", 
usedMemory=" + usedMemory
+      LOG.debug("Notifying unreserve : size=" + size + ", commitMemory=" + 
commitMemory + ", usedMemory=" + usedMemory
           + ", mergeThreshold=" + mergeThreshold);
     }
     notifyAll();
@@ -613,7 +614,7 @@ public class MergeManager {
             mergeOutputSize += mo.getSize();
             IFile.Reader reader = new InMemoryReader(MergeManager.this,
                 mo.getAttemptIdentifier(), mo.getMemory(), 0, 
mo.getMemory().length);
-            inMemorySegments.add(new Segment(reader, true,
+            inMemorySegments.add(new Segment(reader,
                 (mo.isPrimaryMapOutput() ? mergedMapOutputsCounter : null)));
             lastAddedMapOutput = mo;
             it.remove();
@@ -816,7 +817,7 @@ public class MergeManager {
         }
         final Path file = fileChunk.getPath();
         approxOutputSize += size;
-        Segment segment = new Segment(rfs, file, offset, size, codec, 
ifileReadAhead,
+        DiskSegment segment = new DiskSegment(rfs, file, offset, size, codec, 
ifileReadAhead,
             ifileReadAheadLength, ifileBufferSize, preserve);
         inputSegments.add(segment);
       }
@@ -901,7 +902,7 @@ public class MergeManager {
       IFile.Reader reader = new InMemoryReader(MergeManager.this, 
                                                    mo.getAttemptIdentifier(),
                                                    data, 0, (int)size);
-      inMemorySegments.add(new Segment(reader, true, 
+      inMemorySegments.add(new Segment(reader,
                                             (mo.isPrimaryMapOutput() ? 
                                             mergedMapOutputsCounter : null)));
     }
@@ -1063,7 +1064,7 @@ public class MergeManager {
 
       final long fileOffset = fileChunk.getOffset();
       final boolean preserve = fileChunk.isLocalFile();
-      diskSegments.add(new Segment(fs, file, fileOffset, fileLength, codec, 
ifileReadAhead,
+      diskSegments.add(new DiskSegment(fs, file, fileOffset, fileLength, 
codec, ifileReadAhead,
                                    ifileReadAheadLength, ifileBufferSize, 
preserve, counter));
     }
     LOG.info("Merging " + onDisk.length + " files, " +
@@ -1096,7 +1097,7 @@ public class MergeManager {
         return diskMerge;
       }
       finalSegments.add(new Segment(
-            new RawKVIteratorReader(diskMerge, onDiskBytes), true));
+            new RawKVIteratorReader(diskMerge, onDiskBytes), null));
     }
     // This is doing nothing but creating an iterator over the segments.
     return TezMerger.merge(job, fs, keyClass, valueClass,

http://git-wip-us.apache.org/repos/asf/tez/blob/714d8c43/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 8e5c598..a2d06a8 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -57,6 +57,7 @@ import 
org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
+import org.apache.tez.runtime.library.common.sort.impl.TezMerger.DiskSegment;
 import org.apache.tez.runtime.library.common.sort.impl.TezMerger.Segment;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -607,8 +608,8 @@ public class PipelinedSorter extends ExternalSorter {
         Path spillFilename = spillFilePaths.get(i);
         TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
 
-        Segment s =
-            new Segment(rfs, spillFilename, indexRecord.getStartOffset(),
+        DiskSegment s =
+            new DiskSegment(rfs, spillFilename, indexRecord.getStartOffset(),
                              indexRecord.getPartLength(), codec, 
ifileReadAhead,
                              ifileReadAheadLength, ifileBufferSize, true);
         segmentList.add(i, s);

http://git-wip-us.apache.org/repos/asf/tez/blob/714d8c43/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
index 745e9d5..a9542e0 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
@@ -221,10 +221,119 @@ public class TezMerger {
 
   @InterfaceAudience.Private
   @InterfaceStability.Unstable
-  public static class Segment<K extends Object, V extends Object> {
+  static class KeyValueBuffer {
+    private byte[] buf;
+    private int position;
+    private int length;
+
+    public KeyValueBuffer(byte buf[], int position, int length) {
+      reset(buf, position, length);
+    }
+
+    public void reset(byte[] input, int position, int length) {
+      this.buf = input;
+      this.position = position;
+      this.length = length;
+    }
+
+    public byte[] getData() {
+      return buf;
+    }
+
+    public int getPosition() {
+      return position;
+    }
+
+    public int getLength() {
+      return length;
+    }
+  }
+
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  public static class Segment {
+    static final byte[] EMPTY_BYTES = new byte[0];
     Reader reader = null;
-    final DataInputBuffer key = new DataInputBuffer();
-    
+    final KeyValueBuffer key = new KeyValueBuffer(EMPTY_BYTES, 0, 0);
+    TezCounter mapOutputsCounter = null;
+
+    public Segment(Reader reader, TezCounter mapOutputsCounter) {
+      this.reader = reader;
+      this.mapOutputsCounter = mapOutputsCounter;
+    }
+
+    void init(TezCounter readsCounter, TezCounter bytesReadCounter) throws 
IOException {
+      if (mapOutputsCounter != null) {
+        mapOutputsCounter.increment(1);
+      }
+    }
+
+    boolean inMemory() {
+      return true;
+    }
+
+    KeyValueBuffer getKey() { return key; }
+
+    DataInputBuffer getValue(DataInputBuffer value) throws IOException {
+      nextRawValue(value);
+      return value;
+    }
+
+    public long getLength() {
+      return reader.getLength();
+    }
+
+    KeyState readRawKey(DataInputBuffer nextKey) throws IOException {
+      KeyState keyState = reader.readRawKey(nextKey);
+      key.reset(nextKey.getData(), nextKey.getPosition(), nextKey.getLength() 
- nextKey.getPosition());
+      return keyState;
+    }
+
+    boolean nextRawKey(DataInputBuffer nextKey) throws IOException {
+      boolean hasNext = reader.nextRawKey(nextKey);
+      key.reset(nextKey.getData(), nextKey.getPosition(), nextKey.getLength() 
- nextKey.getPosition());
+      return hasNext;
+    }
+
+    void nextRawValue(DataInputBuffer value) throws IOException {
+      reader.nextRawValue(value);
+    }
+
+    void closeReader() throws IOException {
+      if (reader != null) {
+        reader.close();
+        reader = null;
+      }
+    }
+
+    void close() throws IOException {
+      closeReader();
+    }
+
+    public long getPosition() throws IOException {
+      return reader.getPosition();
+    }
+
+    // This method is used by BackupStore to extract the
+    // absolute position after a reset
+    long getActualPosition() throws IOException {
+      return reader.getPosition();
+    }
+
+    Reader getReader() {
+      return reader;
+    }
+
+    // This method is used by BackupStore to reinitialize the
+    // reader to start reading from a different segment offset
+    void reinitReader(int offset) throws IOException {
+    }
+  }
+
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  public static class DiskSegment extends Segment {
+
     FileSystem fs = null;
     Path file = null;
     boolean preserve = false; // Signifies whether the segment should be kept 
after a merge is complete. Checked in the close method.
@@ -234,10 +343,8 @@ public class TezMerger {
     boolean ifileReadAhead;
     int ifileReadAheadLength;
     int bufferSize = -1;
-    
-    TezCounter mapOutputsCounter = null;
 
-    public Segment(FileSystem fs, Path file,
+    public DiskSegment(FileSystem fs, Path file,
         CompressionCodec codec, boolean ifileReadAhead,
         int ifileReadAheadLength, int bufferSize, boolean preserve)
     throws IOException {
@@ -245,7 +352,7 @@ public class TezMerger {
           bufferSize, preserve, null);
     }
 
-    public Segment(FileSystem fs, Path file,
+    public DiskSegment(FileSystem fs, Path file,
                    CompressionCodec codec, boolean ifileReadAhead, int 
ifileReadAheadLenth,
                    int bufferSize, boolean preserve, TezCounter 
mergedMapOutputsCounter)
   throws IOException {
@@ -254,7 +361,7 @@ public class TezMerger {
           mergedMapOutputsCounter);
     }
 
-    public Segment(FileSystem fs, Path file,
+    public DiskSegment(FileSystem fs, Path file,
                    long segmentOffset, long segmentLength,
                    CompressionCodec codec, boolean ifileReadAhead,
                    int ifileReadAheadLength,  int bufferSize, 
@@ -263,11 +370,12 @@ public class TezMerger {
           ifileReadAheadLength, bufferSize, preserve, null);
     }
 
-    public Segment(FileSystem fs, Path file,
+    public DiskSegment(FileSystem fs, Path file,
         long segmentOffset, long segmentLength, CompressionCodec codec,
         boolean ifileReadAhead, int ifileReadAheadLength, int bufferSize,
         boolean preserve, TezCounter mergedMapOutputsCounter)
     throws IOException {
+      super(null, mergedMapOutputsCounter);
       this.fs = fs;
       this.file = file;
       this.codec = codec;
@@ -278,94 +386,45 @@ public class TezMerger {
 
       this.segmentOffset = segmentOffset;
       this.segmentLength = segmentLength;
-      
-      this.mapOutputsCounter = mergedMapOutputsCounter;
-    }
-    
-    public Segment(Reader reader, boolean preserve) {
-      this(reader, preserve, null);
-    }
-    
-    public Segment(Reader reader, boolean preserve, 
-                   TezCounter mapOutputsCounter) {
-      this.reader = reader;
-      this.preserve = preserve;
-      
-      this.segmentLength = reader.getLength();
-      
-      this.mapOutputsCounter = mapOutputsCounter;
     }
 
-    void init(TezCounter readsCounter, TezCounter byetsReadCounter) throws 
IOException {      
-      if (reader == null) { 
-        FSDataInputStream in = fs.open(file);
-        in.seek(segmentOffset);
-        reader = new Reader(in, segmentLength, codec, readsCounter, 
byetsReadCounter,
-            ifileReadAhead, ifileReadAheadLength, bufferSize);
-      }
-      if (mapOutputsCounter != null) {
-        mapOutputsCounter.increment(1);
-      }
-    }
-    
-    boolean inMemory() {
-      return fs == null;
+    @Override
+    void init(TezCounter readsCounter, TezCounter bytesReadCounter) throws 
IOException {
+      super.init(readsCounter, bytesReadCounter);
+      FSDataInputStream in = fs.open(file);
+      in.seek(segmentOffset);
+      reader = new Reader(in, segmentLength, codec, readsCounter, 
bytesReadCounter, ifileReadAhead,
+          ifileReadAheadLength, bufferSize);
     }
-    
-    DataInputBuffer getKey() { return key; }
 
-    DataInputBuffer getValue(DataInputBuffer value) throws IOException {
-      nextRawValue(value);
-      return value;
+    @Override
+    boolean inMemory() {
+      return false;
     }
 
-    public long getLength() { 
+    @Override
+    public long getLength() {
       return (reader == null) ?
         segmentLength : reader.getLength();
     }
-    
-    KeyState readRawKey() throws IOException {
-      return reader.readRawKey(key);
-    }
-    
-    boolean nextRawKey() throws IOException {
-      return reader.nextRawKey(key);
-    }
 
-    void nextRawValue(DataInputBuffer value) throws IOException {
-      reader.nextRawValue(value);
-    }
-
-    void closeReader() throws IOException {
-      if (reader != null) {
-        reader.close();
-        reader = null;
-      }
-    }
-    
+    @Override
     void close() throws IOException {
-      closeReader();
+      super.close();
       if (!preserve && fs != null) {
         fs.delete(file, false);
       }
     }
-
-    public long getPosition() throws IOException {
-      return reader.getPosition();
-    }
-
-    // This method is used by BackupStore to extract the 
+    // This method is used by BackupStore to extract the
     // absolute position after a reset
+    @Override
     long getActualPosition() throws IOException {
       return segmentOffset + reader.getPosition();
     }
 
-    Reader getReader() {
-      return reader;
-    }
-    
     // This method is used by BackupStore to reinitialize the
     // reader to start reading from a different segment offset
+    @Override
     void reinitReader(int offset) throws IOException {
       if (!inMemory()) {
         closeReader();
@@ -401,8 +460,9 @@ public class TezMerger {
 
     Progressable reporter;
     
-    DataInputBuffer key;
+    final DataInputBuffer key = new DataInputBuffer();
     final DataInputBuffer value = new DataInputBuffer();
+    final DataInputBuffer nextKey = new DataInputBuffer();
     final DataInputBuffer diskIFileValue = new DataInputBuffer();
     
     Segment minSegment;
@@ -440,7 +500,7 @@ public class TezMerger {
       
       for (Path file : inputs) {
         LOG.debug("MergeQ: adding: " + file);
-        segments.add(new Segment(fs, file, codec, ifileReadAhead,
+        segments.add(new DiskSegment(fs, file, codec, ifileReadAhead,
                                       ifileReadAheadLength, ifileBufferSize,
                                       !deleteInputs, 
                                        (file.toString().endsWith(
@@ -516,7 +576,7 @@ public class TezMerger {
           populatePreviousKey();
         }
       }
-      hasNext = reader.readRawKey();
+      hasNext = reader.readRawKey(nextKey);
       long endPos = reader.getPosition();
       totalBytesProcessed += endPos - startPos;
       mergeProgress.set(totalBytesProcessed * progPerByte);
@@ -543,13 +603,12 @@ public class TezMerger {
       Segment nextTop = top();
       if (nextTop != current) {
         //we have a different file. Compare it with previous key
-        DataInputBuffer nextKey = nextTop.getKey();
+        KeyValueBuffer nextKey = nextTop.getKey();
         int compare = compare(nextKey, prevKey);
         if (compare == 0) {
           //Same key is available in the next segment.
           hasNext = KeyState.SAME_KEY;
         }
-        nextKey.reset();
       }
     }
 
@@ -569,7 +628,8 @@ public class TezMerger {
       }
       minSegment = top();
       long startPos = minSegment.getPosition();
-      key = minSegment.getKey();
+      KeyValueBuffer nextKey = minSegment.getKey();
+      key.reset(nextKey.getData(), nextKey.getPosition(), nextKey.getLength());
       if (!minSegment.inMemory()) {
         //When we load the value from an inmemory segment, we reset
         //the "value" DIB in this class to the inmem segment's byte[].
@@ -588,26 +648,27 @@ public class TezMerger {
       long endPos = minSegment.getPosition();
       totalBytesProcessed += endPos - startPos;
       mergeProgress.set(totalBytesProcessed * progPerByte);
+
       return true;
     }
 
-    int compare(DataInputBuffer buf1, DataOutputBuffer buf2) {
-      byte[] b1 = buf1.getData();
+    int compare(KeyValueBuffer nextKey, DataOutputBuffer buf2) {
+      byte[] b1 = nextKey.getData();
       byte[] b2 = buf2.getData();
-      int s1 = buf1.getPosition();
+      int s1 = nextKey.getPosition();
       int s2 = 0;
-      int l1 = buf1.getLength();
+      int l1 = nextKey.getLength();
       int l2 = buf2.getLength();
-      return comparator.compare(b1, s1, (l1 - s1), b2, s2, l2);
+      return comparator.compare(b1, s1, l1, b2, s2, l2);
     }
 
     protected boolean lessThan(Object a, Object b) {
-      DataInputBuffer key1 = ((Segment)a).getKey();
-      DataInputBuffer key2 = ((Segment)b).getKey();
+      KeyValueBuffer key1 = ((Segment)a).getKey();
+      KeyValueBuffer key2 = ((Segment)b).getKey();
       int s1 = key1.getPosition();
-      int l1 = key1.getLength() - s1;
+      int l1 = key1.getLength();
       int s2 = key2.getPosition();
-      int l2 = key2.getLength() - s2;
+      int l2 = key2.getLength();;
 
       return comparator.compare(key1.getData(), s1, l1, key2.getData(), s2, 
l2) < 0;
     }
@@ -680,7 +741,7 @@ public class TezMerger {
 
             segment.init(readsCounter, bytesReadCounter);
             long startPos = segment.getPosition();
-            boolean hasNext = segment.nextRawKey();
+            boolean hasNext = segment.nextRawKey(nextKey);
             long endPos = segment.getPosition();
             
             if (hasNext) {
@@ -781,7 +842,7 @@ public class TezMerger {
 
           // Add the newly create segment to the list of segments to be merged
           Segment tempSegment = 
-            new Segment(fs, outputFile, codec, ifileReadAhead,
+            new DiskSegment(fs, outputFile, codec, ifileReadAhead,
                 ifileReadAheadLength, ifileBufferSize, false);
 
           // Insert new merged segment into the sorted list

http://git-wip-us.apache.org/repos/asf/tez/blob/714d8c43/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index 7cb7742..7fb4dc1 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -54,6 +54,7 @@ import 
org.apache.tez.runtime.library.common.sort.impl.TezMerger;
 import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
 import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
 import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
+import org.apache.tez.runtime.library.common.sort.impl.TezMerger.DiskSegment;
 import org.apache.tez.runtime.library.common.sort.impl.TezMerger.Segment;
 
 import com.google.common.base.Preconditions;
@@ -1244,8 +1245,8 @@ public final class DefaultSorter extends ExternalSorter 
implements IndexedSortab
           outputContext.notifyProgress();
           TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
 
-          Segment s =
-            new Segment(rfs, filename[i], indexRecord.getStartOffset(),
+          DiskSegment s =
+            new DiskSegment(rfs, filename[i], indexRecord.getStartOffset(),
                              indexRecord.getPartLength(), codec, 
ifileReadAhead,
                              ifileReadAheadLength, ifileBufferSize, true);
           segmentList.add(i, s);

http://git-wip-us.apache.org/repos/asf/tez/blob/714d8c43/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
index edb9b15..7502f86 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
@@ -518,7 +518,7 @@ public class TestValuesIterator {
       }
       IFile.Reader reader = new InMemoryReader(mergeManager, null, 
bout.getBuffer(), 0,
           bout.getBuffer().length);
-      segments.add(new TezMerger.Segment(reader, true));
+      segments.add(new TezMerger.Segment(reader, null));
 
       data.clear();
       writer.close();

http://git-wip-us.apache.org/repos/asf/tez/blob/714d8c43/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
index 8b52c44..3965095 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
@@ -731,7 +731,7 @@ public class TestTezMerger {
       writer.close();
       InMemoryReader reader = new InMemoryReader(merger, null, 
stream.getBuffer(), 0, stream.getLimit());
 
-      segmentList.add(new TezMerger.Segment(reader, true, null));
+      segmentList.add(new TezMerger.Segment(reader, null));
     }
     return segmentList;
   }
@@ -756,7 +756,7 @@ public class TestTezMerger {
       int repeatCount = ((i % 2 == 0) && keysPerSegment > 0) ? 
rnd.nextInt(keysPerSegment) : 0;
       Path ifilePath = writeIFile(keysPerSegment, repeatCount);
 
-      segmentList.add(new TezMerger.Segment(localFs, ifilePath, 0, 
localFs.getFileStatus
+      segmentList.add(new TezMerger.DiskSegment(localFs, ifilePath, 0, 
localFs.getFileStatus
           (ifilePath).getLen(), null, false, 1024, 1024, false, null));
     }
     return segmentList;

Reply via email to