[FLINK-1296] [runtime] Fix bug that first record in buffer is not correctly 
recognized as large record


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/76eaef0a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/76eaef0a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/76eaef0a

Branch: refs/heads/master
Commit: 76eaef0a6b7a089a6b1577492678cd5a259c743d
Parents: 28a62d8
Author: Stephan Ewen <[email protected]>
Authored: Thu Dec 18 16:28:34 2014 +0100
Committer: Ufuk Celebi <[email protected]>
Committed: Wed Jan 21 12:01:36 2015 +0100

----------------------------------------------------------------------
 .../runtime/io/disk/RandomAccessInputView.java  | 21 +++------
 .../operators/sort/NormalizedKeySorter.java     | 49 +++++++++++++++-----
 .../operators/sort/UnilateralSortMerger.java    | 11 ++++-
 3 files changed, 52 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/76eaef0a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessInputView.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessInputView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessInputView.java
index 44e6398..718b8af 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessInputView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/RandomAccessInputView.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.io.disk;
 
 import java.io.EOFException;
@@ -28,12 +27,8 @@ import 
org.apache.flink.runtime.memorymanager.AbstractPagedInputView;
 import org.apache.flink.runtime.util.MathUtils;
 
 
-/**
- *
- *
- */
-public class RandomAccessInputView extends AbstractPagedInputView implements 
SeekableDataInputView
-{      
+public class RandomAccessInputView extends AbstractPagedInputView implements 
SeekableDataInputView {
+       
        private final ArrayList<MemorySegment> segments;
        
        private int currentSegmentIndex;
@@ -51,8 +46,7 @@ public class RandomAccessInputView extends 
AbstractPagedInputView implements See
                this(segments, segmentSize, segmentSize);
        }
        
-       public RandomAccessInputView(ArrayList<MemorySegment> segments, int 
segmentSize, int limitInLastSegment)
-       {
+       public RandomAccessInputView(ArrayList<MemorySegment> segments, int 
segmentSize, int limitInLastSegment) {
                super(segments.get(0), segments.size() > 1 ? segmentSize : 
limitInLastSegment, 0);
                this.segments = segments;
                this.currentSegmentIndex = 0;
@@ -64,8 +58,7 @@ public class RandomAccessInputView extends 
AbstractPagedInputView implements See
 
 
        @Override
-       public void setReadPosition(long position)
-       {
+       public void setReadPosition(long position) {
                final int bufferNum = (int) (position >>> this.segmentSizeBits);
                final int offset = (int) (position & this.segmentSizeMask);
                
@@ -75,8 +68,7 @@ public class RandomAccessInputView extends 
AbstractPagedInputView implements See
 
 
        @Override
-       protected MemorySegment nextSegment(MemorySegment current) throws 
EOFException
-       {
+       protected MemorySegment nextSegment(MemorySegment current) throws 
EOFException {
                if (++this.currentSegmentIndex < this.segments.size()) {
                        return this.segments.get(this.currentSegmentIndex);
                } else {
@@ -86,8 +78,7 @@ public class RandomAccessInputView extends 
AbstractPagedInputView implements See
 
 
        @Override
-       protected int getLimitForSegment(MemorySegment segment)
-       {
+       protected int getLimitForSegment(MemorySegment segment) {
                return this.currentSegmentIndex == this.segments.size() - 1 ? 
this.limitInLastSegment : this.segmentSize;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/76eaef0a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
index 97b9236..fe87788 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators.sort;
 
 import java.io.EOFException;
@@ -32,12 +31,16 @@ import 
org.apache.flink.runtime.io.disk.SimpleCollectingOutputView;
 import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
 import org.apache.flink.runtime.memorymanager.ListMemorySegmentSource;
 import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * 
  */
 public final class NormalizedKeySorter<T> implements InMemorySorter<T> {
        
+       private static final Logger LOG = 
LoggerFactory.getLogger(NormalizedKeySorter.class);
+       
        private static final int OFFSET_LEN = 8;
        
        private static final int DEFAULT_MAX_NORMALIZED_KEY_LEN = 16;
@@ -47,6 +50,10 @@ public final class NormalizedKeySorter<T> implements 
InMemorySorter<T> {
        private static final int MIN_REQUIRED_BUFFERS = 3;
        
        private static final int LARGE_RECORD_THRESHOLD = 10 * 1024 * 1024;
+       
+       private static final long LARGE_RECORD_TAG = 1L << 63;
+       
+       private static final long POINTER_MASK = LARGE_RECORD_TAG - 1;
 
        // 
------------------------------------------------------------------------
        //                               Members
@@ -293,8 +300,14 @@ public final class NormalizedKeySorter<T> implements 
InMemorySorter<T> {
                final long newOffset = this.recordCollector.getCurrentOffset();
                final boolean shortRecord = newOffset - 
this.currentDataBufferOffset < LARGE_RECORD_THRESHOLD;
                
+               if (!shortRecord && LOG.isDebugEnabled()) {
+                       LOG.debug("Put a large record ( >" + 
LARGE_RECORD_THRESHOLD + " into the sort buffer");
+               }
+               
                // add the pointer and the normalized key
-               
this.currentSortIndexSegment.putLong(this.currentSortIndexOffset, shortRecord ? 
this.currentDataBufferOffset : -this.currentDataBufferOffset);
+               
this.currentSortIndexSegment.putLong(this.currentSortIndexOffset, shortRecord ?
+                               this.currentDataBufferOffset : 
(this.currentDataBufferOffset | LARGE_RECORD_TAG));
+
                if (this.numKeyBytes != 0) {
                        this.comparator.putNormalizedKey(record, 
this.currentSortIndexSegment, this.currentSortIndexOffset + OFFSET_LEN, 
this.numKeyBytes);
                }
@@ -317,7 +330,7 @@ public final class NormalizedKeySorter<T> implements 
InMemorySorter<T> {
                final int bufferNum = logicalPosition / 
this.indexEntriesPerSegment;
                final int segmentOffset = logicalPosition % 
this.indexEntriesPerSegment;
                
-               return 
Math.abs(this.sortIndex.get(bufferNum).getLong(segmentOffset * 
this.indexEntrySize));
+               return (this.sortIndex.get(bufferNum).getLong(segmentOffset * 
this.indexEntrySize)) & POINTER_MASK;
        }
        
        private final T getRecordFromBuffer(T reuse, long pointer) throws 
IOException {
@@ -370,8 +383,8 @@ public final class NormalizedKeySorter<T> implements 
InMemorySorter<T> {
                        return this.useNormKeyUninverted ? val : -val;
                }
                
-               final long pointerI = Math.abs(segI.getLong(segmentOffsetI));
-               final long pointerJ = Math.abs(segJ.getLong(segmentOffsetJ));
+               final long pointerI = segI.getLong(segmentOffsetI) & 
POINTER_MASK;
+               final long pointerJ = segJ.getLong(segmentOffsetJ)  & 
POINTER_MASK;
                
                return compareRecords(pointerI, pointerJ);
        }
@@ -415,8 +428,7 @@ public final class NormalizedKeySorter<T> implements 
InMemorySorter<T> {
                        private MemorySegment currentIndexSegment = 
sortIndex.get(0);
 
                        @Override
-                       public T next(T target)
-                       {
+                       public T next(T target) {
                                if (this.current < this.size) {
                                        this.current++;
                                        if (this.currentOffset > 
lastIndexEntryOffset) {
@@ -424,7 +436,7 @@ public final class NormalizedKeySorter<T> implements 
InMemorySorter<T> {
                                                this.currentIndexSegment = 
sortIndex.get(++this.currentSegment);
                                        }
                                        
-                                       long pointer = 
Math.abs(this.currentIndexSegment.getLong(this.currentOffset));
+                                       long pointer = 
this.currentIndexSegment.getLong(this.currentOffset) & POINTER_MASK;
                                        this.currentOffset += indexEntrySize;
                                        
                                        try {
@@ -485,6 +497,14 @@ public final class NormalizedKeySorter<T> implements 
InMemorySorter<T> {
        public void writeToOutput(ChannelWriterOutputView output, 
LargeRecordHandler<T> largeRecordsOutput)
                        throws IOException
        {
+               if (LOG.isDebugEnabled()) {
+                       if (largeRecordsOutput == null) {
+                               LOG.debug("Spilling sort buffer without large 
record handling.");
+                       } else {
+                               LOG.debug("Spilling sort buffer with large 
record handling.");
+                       }
+               }
+               
                final int numRecords = this.numRecords;
                int currentMemSeg = 0;
                int currentRecord = 0;
@@ -497,12 +517,17 @@ public final class NormalizedKeySorter<T> implements 
InMemorySorter<T> {
                                final long pointer = 
currentIndexSegment.getLong(offset);
                                
                                // small records go into the regular spill 
file, large records into the special code path
-                               if (pointer >= 0) {
+                               if (pointer >= 0 || largeRecordsOutput == null) 
{
                                        
this.recordBuffer.setReadPosition(pointer);
                                        this.serializer.copy(this.recordBuffer, 
output);
                                }
                                else {
-                                       
this.recordBuffer.setReadPosition(-pointer);
+                                       
+                                       if (LOG.isDebugEnabled()) {
+                                               LOG.debug("Spilling large 
record to large record fetch file.");
+                                       }
+                                       
+                                       
this.recordBuffer.setReadPosition(pointer & POINTER_MASK);
                                        T record = 
this.serializer.deserialize(this.recordBuffer);
                                        largeRecordsOutput.addRecord(record);
                                }
@@ -530,7 +555,7 @@ public final class NormalizedKeySorter<T> implements 
InMemorySorter<T> {
                        if (num >= this.indexEntriesPerSegment && offset == 0) {
                                // full segment
                                for (;offset <= this.lastIndexEntryOffset; 
offset += this.indexEntrySize) {
-                                       final long pointer = 
Math.abs(currentIndexSegment.getLong(offset));
+                                       final long pointer = 
currentIndexSegment.getLong(offset) & POINTER_MASK;
                                        
this.recordBuffer.setReadPosition(pointer);
                                        this.serializer.copy(this.recordBuffer, 
output);
                                }
@@ -539,7 +564,7 @@ public final class NormalizedKeySorter<T> implements 
InMemorySorter<T> {
                                // partially filled segment
                                for (; num > 0 && offset <= 
this.lastIndexEntryOffset; num--, offset += this.indexEntrySize)
                                {
-                                       final long pointer = 
Math.abs(currentIndexSegment.getLong(offset));
+                                       final long pointer = 
currentIndexSegment.getLong(offset) & POINTER_MASK;
                                        
this.recordBuffer.setReadPosition(pointer);
                                        this.serializer.copy(this.recordBuffer, 
output);
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/76eaef0a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
index a534399..6e89300 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
@@ -913,10 +913,15 @@ public class UnilateralSortMerger<E> implements Sorter<E> 
{
                                // write the last leftover pair, if we have one
                                if (leftoverRecord != null) {
                                        if (!buffer.write(leftoverRecord)) {
+                                               
                                                // did not fit in a fresh 
buffer, must be large...
                                                if (this.largeRecords != null) {
+                                                       if 
(LOG.isDebugEnabled()) {
+                                                               
LOG.debug("Large record did not fit into a fresh sort buffer. Putting into 
large record store.");
+                                                       }
                                                        
this.largeRecords.addRecord(leftoverRecord);
-                                               } else {
+                                               }
+                                               else {
                                                        throw new 
IOException("The record exceeds the maximum size of a sort buffer (current 
maximum: "
                                                                        + 
buffer.getCapacity() + " bytes).");
                                                }
@@ -1266,7 +1271,6 @@ public class UnilateralSortMerger<E> implements Sorter<E> 
{
                        
                        final FileIOChannel.Enumerator enumerator = 
this.ioManager.createChannelEnumerator();
                        List<ChannelWithBlockCount> channelIDs = new 
ArrayList<ChannelWithBlockCount>();
-
                        
                        // loop as long as the thread is marked alive and we do 
not see the final element
                        while (isRunning()) {
@@ -1367,6 +1371,9 @@ public class UnilateralSortMerger<E> implements Sorter<E> 
{
                                        }
                                }
                                
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug("Sorting keys for large 
records.");
+                               }
                                largeRecords = 
largeRecordHandler.finishWriteAndSortKeys(longRecMem);
                        }
                        else {

Reply via email to