[FLINK-1296] [runtime] Fix bug that first record in buffer is not correctly recognized as large record
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/76eaef0a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/76eaef0a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/76eaef0a Branch: refs/heads/master Commit: 76eaef0a6b7a089a6b1577492678cd5a259c743d Parents: 28a62d8 Author: Stephan Ewen <[email protected]> Authored: Thu Dec 18 16:28:34 2014 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Wed Jan 21 12:01:36 2015 +0100 ---------------------------------------------------------------------- .../runtime/io/disk/RandomAccessInputView.java | 21 +++------ .../operators/sort/NormalizedKeySorter.java | 49 +++++++++++++++----- .../operators/sort/UnilateralSortMerger.java | 11 ++++- 3 files changed, 52 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/76eaef0a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessInputView.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessInputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessInputView.java index 44e6398..718b8af 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessInputView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessInputView.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.runtime.io.disk; import java.io.EOFException; @@ -28,12 +27,8 @@ import org.apache.flink.runtime.memorymanager.AbstractPagedInputView; import org.apache.flink.runtime.util.MathUtils; -/** - * - * - */ -public class RandomAccessInputView extends AbstractPagedInputView implements SeekableDataInputView -{ +public class RandomAccessInputView extends AbstractPagedInputView implements SeekableDataInputView { + private final ArrayList<MemorySegment> segments; private int currentSegmentIndex; @@ -51,8 +46,7 @@ public class RandomAccessInputView extends AbstractPagedInputView implements See this(segments, segmentSize, segmentSize); } - public RandomAccessInputView(ArrayList<MemorySegment> segments, int segmentSize, int limitInLastSegment) - { + public RandomAccessInputView(ArrayList<MemorySegment> segments, int segmentSize, int limitInLastSegment) { super(segments.get(0), segments.size() > 1 ? segmentSize : limitInLastSegment, 0); this.segments = segments; this.currentSegmentIndex = 0; @@ -64,8 +58,7 @@ public class RandomAccessInputView extends AbstractPagedInputView implements See @Override - public void setReadPosition(long position) - { + public void setReadPosition(long position) { final int bufferNum = (int) (position >>> this.segmentSizeBits); final int offset = (int) (position & this.segmentSizeMask); @@ -75,8 +68,7 @@ public class RandomAccessInputView extends AbstractPagedInputView implements See @Override - protected MemorySegment nextSegment(MemorySegment current) throws EOFException - { + protected MemorySegment nextSegment(MemorySegment current) throws EOFException { if (++this.currentSegmentIndex < this.segments.size()) { return this.segments.get(this.currentSegmentIndex); } else { @@ -86,8 +78,7 @@ public class RandomAccessInputView extends AbstractPagedInputView implements See @Override - protected int getLimitForSegment(MemorySegment segment) - { + protected int getLimitForSegment(MemorySegment segment) { return this.currentSegmentIndex == this.segments.size() - 1 ? this.limitInLastSegment : this.segmentSize; } } http://git-wip-us.apache.org/repos/asf/flink/blob/76eaef0a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java index 97b9236..fe87788 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.runtime.operators.sort; import java.io.EOFException; @@ -32,12 +31,16 @@ import org.apache.flink.runtime.io.disk.SimpleCollectingOutputView; import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView; import org.apache.flink.runtime.memorymanager.ListMemorySegmentSource; import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * */ public final class NormalizedKeySorter<T> implements InMemorySorter<T> { + private static final Logger LOG = LoggerFactory.getLogger(NormalizedKeySorter.class); + private static final int OFFSET_LEN = 8; private static final int DEFAULT_MAX_NORMALIZED_KEY_LEN = 16; @@ -47,6 +50,10 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> { private static final int MIN_REQUIRED_BUFFERS = 3; private static final int LARGE_RECORD_THRESHOLD = 10 * 1024 * 1024; + + private static final long LARGE_RECORD_TAG = 1L << 63; + + private static final long POINTER_MASK = LARGE_RECORD_TAG - 1; // ------------------------------------------------------------------------ // Members @@ -293,8 +300,14 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> { final long newOffset = this.recordCollector.getCurrentOffset(); final boolean shortRecord = newOffset - this.currentDataBufferOffset < LARGE_RECORD_THRESHOLD; + if (!shortRecord && LOG.isDebugEnabled()) { + LOG.debug("Put a large record ( >" + LARGE_RECORD_THRESHOLD + " into the sort buffer"); + } + // add the pointer and the normalized key - this.currentSortIndexSegment.putLong(this.currentSortIndexOffset, shortRecord ? this.currentDataBufferOffset : -this.currentDataBufferOffset); + this.currentSortIndexSegment.putLong(this.currentSortIndexOffset, shortRecord ? + this.currentDataBufferOffset : (this.currentDataBufferOffset | LARGE_RECORD_TAG)); + if (this.numKeyBytes != 0) { this.comparator.putNormalizedKey(record, this.currentSortIndexSegment, this.currentSortIndexOffset + OFFSET_LEN, this.numKeyBytes); } @@ -317,7 +330,7 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> { final int bufferNum = logicalPosition / this.indexEntriesPerSegment; final int segmentOffset = logicalPosition % this.indexEntriesPerSegment; - return Math.abs(this.sortIndex.get(bufferNum).getLong(segmentOffset * this.indexEntrySize)); + return (this.sortIndex.get(bufferNum).getLong(segmentOffset * this.indexEntrySize)) & POINTER_MASK; } private final T getRecordFromBuffer(T reuse, long pointer) throws IOException { @@ -370,8 +383,8 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> { return this.useNormKeyUninverted ? val : -val; } - final long pointerI = Math.abs(segI.getLong(segmentOffsetI)); - final long pointerJ = Math.abs(segJ.getLong(segmentOffsetJ)); + final long pointerI = segI.getLong(segmentOffsetI) & POINTER_MASK; + final long pointerJ = segJ.getLong(segmentOffsetJ) & POINTER_MASK; return compareRecords(pointerI, pointerJ); } @@ -415,8 +428,7 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> { private MemorySegment currentIndexSegment = sortIndex.get(0); @Override - public T next(T target) - { + public T next(T target) { if (this.current < this.size) { this.current++; if (this.currentOffset > lastIndexEntryOffset) { @@ -424,7 +436,7 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> { this.currentIndexSegment = sortIndex.get(++this.currentSegment); } - long pointer = Math.abs(this.currentIndexSegment.getLong(this.currentOffset)); + long pointer = this.currentIndexSegment.getLong(this.currentOffset) & POINTER_MASK; this.currentOffset += indexEntrySize; try { @@ -485,6 +497,14 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> { public void writeToOutput(ChannelWriterOutputView output, LargeRecordHandler<T> largeRecordsOutput) throws IOException { + if (LOG.isDebugEnabled()) { + if (largeRecordsOutput == null) { + LOG.debug("Spilling sort buffer without large record handling."); + } else { + LOG.debug("Spilling sort buffer with large record handling."); + } + } + final int numRecords = this.numRecords; int currentMemSeg = 0; int currentRecord = 0; @@ -497,12 +517,17 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> { final long pointer = currentIndexSegment.getLong(offset); // small records go into the regular spill file, large records into the special code path - if (pointer >= 0) { + if (pointer >= 0 || largeRecordsOutput == null) { this.recordBuffer.setReadPosition(pointer); this.serializer.copy(this.recordBuffer, output); } else { - this.recordBuffer.setReadPosition(-pointer); + + if (LOG.isDebugEnabled()) { + LOG.debug("Spilling large record to large record fetch file."); + } + + this.recordBuffer.setReadPosition(pointer & POINTER_MASK); T record = this.serializer.deserialize(this.recordBuffer); largeRecordsOutput.addRecord(record); } @@ -530,7 +555,7 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> { if (num >= this.indexEntriesPerSegment && offset == 0) { // full segment for (;offset <= this.lastIndexEntryOffset; offset += this.indexEntrySize) { - final long pointer = Math.abs(currentIndexSegment.getLong(offset)); + final long pointer = currentIndexSegment.getLong(offset) & POINTER_MASK; this.recordBuffer.setReadPosition(pointer); this.serializer.copy(this.recordBuffer, output); } @@ -539,7 +564,7 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> { // partially filled segment for (; num > 0 && offset <= this.lastIndexEntryOffset; num--, offset += this.indexEntrySize) { - final long pointer = Math.abs(currentIndexSegment.getLong(offset)); + final long pointer = currentIndexSegment.getLong(offset) & POINTER_MASK; this.recordBuffer.setReadPosition(pointer); this.serializer.copy(this.recordBuffer, output); } http://git-wip-us.apache.org/repos/asf/flink/blob/76eaef0a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java index a534399..6e89300 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java @@ -913,10 +913,15 @@ public class UnilateralSortMerger<E> implements Sorter<E> { // write the last leftover pair, if we have one if (leftoverRecord != null) { if (!buffer.write(leftoverRecord)) { + // did not fit in a fresh buffer, must be large... if (this.largeRecords != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Large record did not fit into a fresh sort buffer. Putting into large record store."); + } this.largeRecords.addRecord(leftoverRecord); - } else { + } + else { throw new IOException("The record exceeds the maximum size of a sort buffer (current maximum: " + buffer.getCapacity() + " bytes)."); } @@ -1266,7 +1271,6 @@ public class UnilateralSortMerger<E> implements Sorter<E> { final FileIOChannel.Enumerator enumerator = this.ioManager.createChannelEnumerator(); List<ChannelWithBlockCount> channelIDs = new ArrayList<ChannelWithBlockCount>(); - // loop as long as the thread is marked alive and we do not see the final element while (isRunning()) { @@ -1367,6 +1371,9 @@ public class UnilateralSortMerger<E> implements Sorter<E> { } } + if (LOG.isDebugEnabled()) { + LOG.debug("Sorting keys for large records."); + } largeRecords = largeRecordHandler.finishWriteAndSortKeys(longRecMem); } else {
