Repository: tez Updated Branches: refs/heads/master 53981d4d2 -> a99867786
TEZ-3202. Reduce the memory need for jobs with high number of segments (jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a9986778 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a9986778 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a9986778 Branch: refs/heads/master Commit: a99867786513dc6e07839b8f3adba2e79082339e Parents: 53981d4 Author: Jonathan Eagles <[email protected]> Authored: Thu Apr 14 15:27:39 2016 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Thu Apr 14 15:27:39 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../shuffle/orderedgrouped/MergeManager.java | 13 +- .../common/sort/impl/PipelinedSorter.java | 5 +- .../library/common/sort/impl/TezMerger.java | 245 ++++++++++++------- .../common/sort/impl/dflt/DefaultSorter.java | 5 +- .../library/common/TestValuesIterator.java | 2 +- .../library/common/sort/impl/TestTezMerger.java | 4 +- 7 files changed, 171 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/a9986778/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9e90ae5..46ad9b6 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Release 0.8.4: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3202. Reduce the memory need for jobs with high number of segments Release 0.8.3: 2016-04-14 @@ -433,6 +434,7 @@ INCOMPATIBLE CHANGES TEZ-2949. Allow duplicate dag names within session for Tez. ALL CHANGES: + TEZ-3202. Reduce the memory need for jobs with high number of segments TEZ-3188. Move tez.submit.hosts out of TezConfiguration to TezConfigurationConstants. TEZ-3196. java.lang.InternalError from decompression codec is fatal to a task during shuffle TEZ-3177. Non-DAG events should use the session domain or no domain if the data does not need protection. http://git-wip-us.apache.org/repos/asf/tez/blob/a9986778/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java index 4c999b4..9e2fbd4 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java @@ -48,6 +48,7 @@ import org.apache.tez.runtime.library.common.combine.Combiner; import org.apache.tez.runtime.library.common.sort.impl.IFile; import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer; import org.apache.tez.runtime.library.common.sort.impl.TezMerger; +import org.apache.tez.runtime.library.common.sort.impl.TezMerger.DiskSegment; import org.apache.tez.runtime.library.common.sort.impl.TezMerger.Segment; import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator; import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles; @@ -453,7 +454,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { commitMemory -= size; usedMemory -= size; if (LOG.isDebugEnabled()) { - LOG.debug("Notifying unreserve : commitMemory=" + commitMemory + ", usedMemory=" + usedMemory + LOG.debug("Notifying unreserve : size=" + size + ", commitMemory=" + commitMemory + ", usedMemory=" + usedMemory + ", mergeThreshold=" + mergeThreshold); } notifyAll(); @@ -674,7 +675,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { mergeOutputSize += mo.getSize(); IFile.Reader reader = new InMemoryReader(MergeManager.this, mo.getAttemptIdentifier(), mo.getMemory(), 0, mo.getMemory().length); - inMemorySegments.add(new Segment(reader, true, + inMemorySegments.add(new Segment(reader, (mo.isPrimaryMapOutput() ? mergedMapOutputsCounter : null))); lastAddedMapOutput = mo; it.remove(); @@ -909,7 +910,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { } final Path file = fileChunk.getPath(); approxOutputSize += size; - Segment segment = new Segment(rfs, file, offset, size, codec, ifileReadAhead, + DiskSegment segment = new DiskSegment(rfs, file, offset, size, codec, ifileReadAhead, ifileReadAheadLength, ifileBufferSize, preserve); inputSegments.add(segment); } @@ -1008,7 +1009,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { IFile.Reader reader = new InMemoryReader(MergeManager.this, mo.getAttemptIdentifier(), data, 0, (int)size); - inMemorySegments.add(new Segment(reader, true, + inMemorySegments.add(new Segment(reader, (mo.isPrimaryMapOutput() ? mergedMapOutputsCounter : null))); } @@ -1170,7 +1171,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { final long fileOffset = fileChunk.getOffset(); final boolean preserve = fileChunk.isLocalFile(); - diskSegments.add(new Segment(fs, file, fileOffset, fileLength, codec, ifileReadAhead, + diskSegments.add(new DiskSegment(fs, file, fileOffset, fileLength, codec, ifileReadAhead, ifileReadAheadLength, ifileBufferSize, preserve, counter)); } LOG.info("Merging " + onDisk.length + " files, " + @@ -1203,7 +1204,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { return diskMerge; } finalSegments.add(new Segment( - new RawKVIteratorReader(diskMerge, onDiskBytes), true)); + new RawKVIteratorReader(diskMerge, onDiskBytes), null)); } // This is doing nothing but creating an iterator over the segments. return TezMerger.merge(job, fs, keyClass, valueClass, http://git-wip-us.apache.org/repos/asf/tez/blob/a9986778/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java index fc65622..c698e12 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java @@ -57,6 +57,7 @@ import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.ConfigUtils; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer; +import org.apache.tez.runtime.library.common.sort.impl.TezMerger.DiskSegment; import org.apache.tez.runtime.library.common.sort.impl.TezMerger.Segment; import org.apache.tez.runtime.library.utils.LocalProgress; @@ -730,8 +731,8 @@ public class PipelinedSorter extends ExternalSorter { Path spillFilename = spillFilePaths.get(i); TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts); - Segment s = - new Segment(rfs, spillFilename, indexRecord.getStartOffset(), + DiskSegment s = + new DiskSegment(rfs, spillFilename, indexRecord.getStartOffset(), indexRecord.getPartLength(), codec, ifileReadAhead, ifileReadAheadLength, ifileBufferSize, true); segmentList.add(i, s); http://git-wip-us.apache.org/repos/asf/tez/blob/a9986778/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java index 44618a0..17e0fe2 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java @@ -232,10 +232,119 @@ public class TezMerger { @InterfaceAudience.Private @InterfaceStability.Unstable - public static class Segment<K extends Object, V extends Object> { + static class KeyValueBuffer { + private byte[] buf; + private int position; + private int length; + + public KeyValueBuffer(byte buf[], int position, int length) { + reset(buf, position, length); + } + + public void reset(byte[] input, int position, int length) { + this.buf = input; + this.position = position; + this.length = length; + } + + public byte[] getData() { + return buf; + } + + public int getPosition() { + return position; + } + + public int getLength() { + return length; + } + } + + @InterfaceAudience.Private + @InterfaceStability.Unstable + public static class Segment { + static final byte[] EMPTY_BYTES = new byte[0]; Reader reader = null; - final DataInputBuffer key = new DataInputBuffer(); - + final KeyValueBuffer key = new KeyValueBuffer(EMPTY_BYTES, 0, 0); + TezCounter mapOutputsCounter = null; + + public Segment(Reader reader, TezCounter mapOutputsCounter) { + this.reader = reader; + this.mapOutputsCounter = mapOutputsCounter; + } + + void init(TezCounter readsCounter, TezCounter bytesReadCounter) throws IOException { + if (mapOutputsCounter != null) { + mapOutputsCounter.increment(1); + } + } + + boolean inMemory() { + return true; + } + + KeyValueBuffer getKey() { return key; } + + DataInputBuffer getValue(DataInputBuffer value) throws IOException { + nextRawValue(value); + return value; + } + + public long getLength() { + return reader.getLength(); + } + + KeyState readRawKey(DataInputBuffer nextKey) throws IOException { + KeyState keyState = reader.readRawKey(nextKey); + key.reset(nextKey.getData(), nextKey.getPosition(), nextKey.getLength() - nextKey.getPosition()); + return keyState; + } + + boolean nextRawKey(DataInputBuffer nextKey) throws IOException { + boolean hasNext = reader.nextRawKey(nextKey); + key.reset(nextKey.getData(), nextKey.getPosition(), nextKey.getLength() - nextKey.getPosition()); + return hasNext; + } + + void nextRawValue(DataInputBuffer value) throws IOException { + reader.nextRawValue(value); + } + + void closeReader() throws IOException { + if (reader != null) { + reader.close(); + reader = null; + } + } + + void close() throws IOException { + closeReader(); + } + + public long getPosition() throws IOException { + return reader.getPosition(); + } + + // This method is used by BackupStore to extract the + // absolute position after a reset + long getActualPosition() throws IOException { + return reader.getPosition(); + } + + Reader getReader() { + return reader; + } + + // This method is used by BackupStore to reinitialize the + // reader to start reading from a different segment offset + void reinitReader(int offset) throws IOException { + } + } + + @InterfaceAudience.Private + @InterfaceStability.Unstable + public static class DiskSegment extends Segment { + FileSystem fs = null; Path file = null; boolean preserve = false; // Signifies whether the segment should be kept after a merge is complete. Checked in the close method. @@ -245,10 +354,8 @@ public class TezMerger { boolean ifileReadAhead; int ifileReadAheadLength; int bufferSize = -1; - - TezCounter mapOutputsCounter = null; - public Segment(FileSystem fs, Path file, + public DiskSegment(FileSystem fs, Path file, CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength, int bufferSize, boolean preserve) throws IOException { @@ -256,7 +363,7 @@ public class TezMerger { bufferSize, preserve, null); } - public Segment(FileSystem fs, Path file, + public DiskSegment(FileSystem fs, Path file, CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLenth, int bufferSize, boolean preserve, TezCounter mergedMapOutputsCounter) throws IOException { @@ -265,7 +372,7 @@ public class TezMerger { mergedMapOutputsCounter); } - public Segment(FileSystem fs, Path file, + public DiskSegment(FileSystem fs, Path file, long segmentOffset, long segmentLength, CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength, int bufferSize, @@ -274,11 +381,12 @@ public class TezMerger { ifileReadAheadLength, bufferSize, preserve, null); } - public Segment(FileSystem fs, Path file, + public DiskSegment(FileSystem fs, Path file, long segmentOffset, long segmentLength, CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength, int bufferSize, boolean preserve, TezCounter mergedMapOutputsCounter) throws IOException { + super(null, mergedMapOutputsCounter); this.fs = fs; this.file = file; this.codec = codec; @@ -289,94 +397,45 @@ public class TezMerger { this.segmentOffset = segmentOffset; this.segmentLength = segmentLength; - - this.mapOutputsCounter = mergedMapOutputsCounter; - } - - public Segment(Reader reader, boolean preserve) { - this(reader, preserve, null); - } - - public Segment(Reader reader, boolean preserve, - TezCounter mapOutputsCounter) { - this.reader = reader; - this.preserve = preserve; - - this.segmentLength = reader.getLength(); - - this.mapOutputsCounter = mapOutputsCounter; } - void init(TezCounter readsCounter, TezCounter byetsReadCounter) throws IOException { - if (reader == null) { - FSDataInputStream in = fs.open(file); - in.seek(segmentOffset); - reader = new Reader(in, segmentLength, codec, readsCounter, byetsReadCounter, - ifileReadAhead, ifileReadAheadLength, bufferSize); - } - if (mapOutputsCounter != null) { - mapOutputsCounter.increment(1); - } - } - - boolean inMemory() { - return fs == null; + @Override + void init(TezCounter readsCounter, TezCounter bytesReadCounter) throws IOException { + super.init(readsCounter, bytesReadCounter); + FSDataInputStream in = fs.open(file); + in.seek(segmentOffset); + reader = new Reader(in, segmentLength, codec, readsCounter, bytesReadCounter, ifileReadAhead, + ifileReadAheadLength, bufferSize); } - - DataInputBuffer getKey() { return key; } - DataInputBuffer getValue(DataInputBuffer value) throws IOException { - nextRawValue(value); - return value; + @Override + boolean inMemory() { + return false; } - public long getLength() { + @Override + public long getLength() { return (reader == null) ? segmentLength : reader.getLength(); } - - KeyState readRawKey() throws IOException { - return reader.readRawKey(key); - } - - boolean nextRawKey() throws IOException { - return reader.nextRawKey(key); - } - void nextRawValue(DataInputBuffer value) throws IOException { - reader.nextRawValue(value); - } - - void closeReader() throws IOException { - if (reader != null) { - reader.close(); - reader = null; - } - } - + @Override void close() throws IOException { - closeReader(); + super.close(); if (!preserve && fs != null) { fs.delete(file, false); } } - - public long getPosition() throws IOException { - return reader.getPosition(); - } - - // This method is used by BackupStore to extract the + // This method is used by BackupStore to extract the // absolute position after a reset + @Override long getActualPosition() throws IOException { return segmentOffset + reader.getPosition(); } - Reader getReader() { - return reader; - } - // This method is used by BackupStore to reinitialize the // reader to start reading from a different segment offset + @Override void reinitReader(int offset) throws IOException { if (!inMemory()) { closeReader(); @@ -412,8 +471,9 @@ public class TezMerger { Progressable reporter; - DataInputBuffer key; + final DataInputBuffer key = new DataInputBuffer(); final DataInputBuffer value = new DataInputBuffer(); + final DataInputBuffer nextKey = new DataInputBuffer(); final DataInputBuffer diskIFileValue = new DataInputBuffer(); Segment minSegment; @@ -451,7 +511,7 @@ public class TezMerger { for (Path file : inputs) { LOG.debug("MergeQ: adding: " + file); - segments.add(new Segment(fs, file, codec, ifileReadAhead, + segments.add(new DiskSegment(fs, file, codec, ifileReadAhead, ifileReadAheadLength, ifileBufferSize, !deleteInputs, (file.toString().endsWith( @@ -527,7 +587,7 @@ public class TezMerger { populatePreviousKey(); } } - hasNext = reader.readRawKey(); + hasNext = reader.readRawKey(nextKey); long endPos = reader.getPosition(); totalBytesProcessed += endPos - startPos; mergeProgress.set(totalBytesProcessed * progPerByte); @@ -554,13 +614,12 @@ public class TezMerger { Segment nextTop = top(); if (nextTop != current) { //we have a different file. Compare it with previous key - DataInputBuffer nextKey = nextTop.getKey(); + KeyValueBuffer nextKey = nextTop.getKey(); int compare = compare(nextKey, prevKey); if (compare == 0) { //Same key is available in the next segment. hasNext = KeyState.SAME_KEY; } - nextKey.reset(); } } @@ -580,7 +639,8 @@ public class TezMerger { } minSegment = top(); long startPos = minSegment.getPosition(); - key = minSegment.getKey(); + KeyValueBuffer nextKey = minSegment.getKey(); + key.reset(nextKey.getData(), nextKey.getPosition(), nextKey.getLength()); if (!minSegment.inMemory()) { //When we load the value from an inmemory segment, we reset //the "value" DIB in this class to the inmem segment's byte[]. @@ -599,26 +659,27 @@ public class TezMerger { long endPos = minSegment.getPosition(); totalBytesProcessed += endPos - startPos; mergeProgress.set(totalBytesProcessed * progPerByte); + return true; } - int compare(DataInputBuffer buf1, DataOutputBuffer buf2) { - byte[] b1 = buf1.getData(); + int compare(KeyValueBuffer nextKey, DataOutputBuffer buf2) { + byte[] b1 = nextKey.getData(); byte[] b2 = buf2.getData(); - int s1 = buf1.getPosition(); + int s1 = nextKey.getPosition(); int s2 = 0; - int l1 = buf1.getLength(); + int l1 = nextKey.getLength(); int l2 = buf2.getLength(); - return comparator.compare(b1, s1, (l1 - s1), b2, s2, l2); + return comparator.compare(b1, s1, l1, b2, s2, l2); } protected boolean lessThan(Object a, Object b) { - DataInputBuffer key1 = ((Segment)a).getKey(); - DataInputBuffer key2 = ((Segment)b).getKey(); + KeyValueBuffer key1 = ((Segment)a).getKey(); + KeyValueBuffer key2 = ((Segment)b).getKey(); int s1 = key1.getPosition(); - int l1 = key1.getLength() - s1; + int l1 = key1.getLength(); int s2 = key2.getPosition(); - int l2 = key2.getLength() - s2; + int l2 = key2.getLength();; return comparator.compare(key1.getData(), s1, l1, key2.getData(), s2, l2) < 0; } @@ -691,7 +752,7 @@ public class TezMerger { segment.init(readsCounter, bytesReadCounter); long startPos = segment.getPosition(); - boolean hasNext = segment.nextRawKey(); + boolean hasNext = segment.nextRawKey(nextKey); long endPos = segment.getPosition(); if (hasNext) { @@ -792,7 +853,7 @@ public class TezMerger { // Add the newly create segment to the list of segments to be merged Segment tempSegment = - new Segment(fs, outputFile, codec, ifileReadAhead, + new DiskSegment(fs, outputFile, codec, ifileReadAhead, ifileReadAheadLength, ifileBufferSize, false); // Insert new merged segment into the sorted list http://git-wip-us.apache.org/repos/asf/tez/blob/a9986778/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java index 67da617..a6a60c2 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java @@ -55,6 +55,7 @@ import org.apache.tez.runtime.library.common.sort.impl.TezMerger; import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator; import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord; import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer; +import org.apache.tez.runtime.library.common.sort.impl.TezMerger.DiskSegment; import org.apache.tez.runtime.library.common.sort.impl.TezMerger.Segment; import com.google.common.base.Preconditions; @@ -1287,8 +1288,8 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab outputContext.notifyProgress(); TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts); - Segment s = - new Segment(rfs, filename[i], indexRecord.getStartOffset(), + DiskSegment s = + new DiskSegment(rfs, filename[i], indexRecord.getStartOffset(), indexRecord.getPartLength(), codec, ifileReadAhead, ifileReadAheadLength, ifileBufferSize, true); segmentList.add(i, s); http://git-wip-us.apache.org/repos/asf/tez/blob/a9986778/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java index f3f0f6a..642f02b 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java @@ -525,7 +525,7 @@ public class TestValuesIterator { } IFile.Reader reader = new InMemoryReader(mergeManager, null, bout.getBuffer(), 0, bout.getBuffer().length); - segments.add(new TezMerger.Segment(reader, true)); + segments.add(new TezMerger.Segment(reader, null)); data.clear(); writer.close(); http://git-wip-us.apache.org/repos/asf/tez/blob/a9986778/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java index b654b0f..b35c85f 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java @@ -732,7 +732,7 @@ public class TestTezMerger { writer.close(); InMemoryReader reader = new InMemoryReader(merger, null, stream.getBuffer(), 0, stream.getLimit()); - segmentList.add(new TezMerger.Segment(reader, true, null)); + segmentList.add(new TezMerger.Segment(reader, null)); } return segmentList; } @@ -757,7 +757,7 @@ public class TestTezMerger { int repeatCount = ((i % 2 == 0) && keysPerSegment > 0) ? rnd.nextInt(keysPerSegment) : 0; Path ifilePath = writeIFile(keysPerSegment, repeatCount); - segmentList.add(new TezMerger.Segment(localFs, ifilePath, 0, localFs.getFileStatus + segmentList.add(new TezMerger.DiskSegment(localFs, ifilePath, 0, localFs.getFileStatus (ifilePath).getLen(), null, false, 1024, 1024, false, null)); } return segmentList;
