Repository: flink
Updated Branches:
  refs/heads/master 7610588bc -> 7df6a3d72


[FLINK-1296] [runtime] Limit memory consumption on merged by spilling larger 
records into a single special merge stream


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/28a62d84
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/28a62d84
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/28a62d84

Branch: refs/heads/master
Commit: 28a62d844e7a069a05383bb1e9e71f9daea41199
Parents: be8e1f1
Author: Stephan Ewen <[email protected]>
Authored: Thu Dec 18 11:42:57 2014 +0100
Committer: Ufuk Celebi <[email protected]>
Committed: Wed Jan 21 12:01:35 2015 +0100

----------------------------------------------------------------------
 .../operators/hash/CompactingHashTable.java     |   4 +-
 .../operators/sort/FixedLengthRecordSorter.java |  22 +-
 .../runtime/operators/sort/InMemorySorter.java  |  14 +-
 .../operators/sort/NormalizedKeySorter.java     |  98 ++--
 .../runtime/operators/sort/SortBuffer.java      | 510 -------------------
 .../operators/sort/UnilateralSortMerger.java    | 177 +------
 .../apache/flink/runtime/util/IntArrayList.java |  28 +-
 .../operators/sort/ExternalSortITCase.java      | 240 +--------
 .../sort/ExternalSortLargeRecordsITCase.java    | 455 +++++++++++++++++
 .../sort/MassiveStringSortingITCase.java        |   4 -
 10 files changed, 570 insertions(+), 982 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/28a62d84/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
index 301aa82..7107972 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
@@ -1032,7 +1032,7 @@ public class CompactingHashTable<T> extends 
AbstractMutableHashTable<T>{
                                                int oldBucketCount = 0;
                                                int newBucketCount = 0;
                                                while(!hashList.isEmpty()) {
-                                                       hash = 
hashList.removeInt(hashList.size()-1);
+                                                       hash = 
hashList.removeLast();
                                                        pointer = 
pointerList.removeLong(pointerList.size()-1);
                                                        posHashCode = hash % 
this.numBuckets;
                                                        if(posHashCode == 
bucket && oldBucketCount < NUM_ENTRIES_PER_BUCKET) {
@@ -1061,7 +1061,7 @@ public class CompactingHashTable<T> extends 
AbstractMutableHashTable<T>{
                                int bucketInSegmentPos = 0;
                                MemorySegment bucket = null;
                                while(!overflowHashes.isEmpty()) {
-                                       hash = 
overflowHashes.removeInt(overflowHashes.size()-1);
+                                       hash = overflowHashes.removeLast();
                                        pointer = 
overflowPointers.removeLong(overflowPointers.size()-1);
                                        posHashCode = hash % this.numBuckets; 
                                        bucketArrayPos = posHashCode >>> 
this.bucketsPerSegmentBits;

http://git-wip-us.apache.org/repos/asf/flink/blob/28a62d84/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
index a3766e7..cd982c4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
@@ -186,25 +186,20 @@ public final class FixedLengthRecordSorter<T> implements 
InMemorySorter<T> {
                return this.freeMemory;
        }
        
-       /**
-        * Gets the total capacity of this sorter, in bytes.
-        * 
-        * @return The sorter's total capacity.
-        */
        @Override
        public long getCapacity() {
                return ((long) this.totalNumBuffers) * this.segmentSize;
        }
        
-       /**
-        * Gets the number of bytes currently occupied in this sorter.
-        * 
-        * @return The number of bytes occupied.
-        */
        @Override
        public long getOccupancy() {
                return this.sortBufferBytes;
        }
+       
+       @Override
+       public long getNumRecordBytes() {
+               return this.sortBufferBytes;
+       }
 
        // 
-------------------------------------------------------------------------
        // Retrieving and Writing
@@ -430,6 +425,13 @@ public final class FixedLengthRecordSorter<T> implements 
InMemorySorter<T> {
                }
        }
        
+       @Override
+       public void writeToOutput(ChannelWriterOutputView output, 
LargeRecordHandler<T> largeRecordsOutput)
+                       throws IOException
+       {
+               writeToOutput(output);
+       }
+       
        /**
         * Writes a subset of the records in this buffer in their logical order 
to the given output.
         * 

http://git-wip-us.apache.org/repos/asf/flink/blob/28a62d84/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java
index 7dc4f97..633ec70 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java
@@ -25,7 +25,6 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
 import org.apache.flink.util.MutableObjectIterator;
 
-
 /**
  *
  */
@@ -58,13 +57,20 @@ public interface InMemorySorter<T> extends IndexedSortable {
        long getCapacity();
        
        /**
-        * Gets the number of bytes currently occupied in this sorter.
+        * Gets the number of bytes currently occupied in this sorter, records 
and sort index.
         * 
         * @return The number of bytes occupied.
         */
        long getOccupancy();
        
        /**
+        * Gets the number of bytes occupied by records only.
+        * 
+        * @return The number of bytes occupied by records.
+        */
+       long getNumRecordBytes();
+       
+       /**
         * Gets the record at the given logical position.
         * 
         * @param reuse The reuse object to deserialize the record into.
@@ -96,7 +102,9 @@ public interface InMemorySorter<T> extends IndexedSortable {
         * @param output The output view to write the records to.
         * @throws IOException Thrown, if an I/O exception occurred writing to 
the output view.
         */
-       public void writeToOutput(final ChannelWriterOutputView output) throws 
IOException;
+       public void writeToOutput(ChannelWriterOutputView output) throws 
IOException;
+       
+       public void writeToOutput(ChannelWriterOutputView output, 
LargeRecordHandler<T> largeRecordsOutput) throws IOException;
        
        /**
         * Writes a subset of the records in this buffer in their logical order 
to the given output.

http://git-wip-us.apache.org/repos/asf/flink/blob/28a62d84/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
index c382708..97b9236 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
@@ -45,6 +45,8 @@ public final class NormalizedKeySorter<T> implements 
InMemorySorter<T> {
        private static final int MAX_NORMALIZED_KEY_LEN_PER_ELEMENT = 8;
        
        private static final int MIN_REQUIRED_BUFFERS = 3;
+       
+       private static final int LARGE_RECORD_THRESHOLD = 10 * 1024 * 1024;
 
        // 
------------------------------------------------------------------------
        //                               Members
@@ -227,25 +229,20 @@ public final class NormalizedKeySorter<T> implements 
InMemorySorter<T> {
                return this.freeMemory;
        }
        
-       /**
-        * Gets the total capacity of this sorter, in bytes.
-        * 
-        * @return The sorter's total capacity.
-        */
        @Override
        public long getCapacity() {
                return ((long) this.totalNumBuffers) * this.segmentSize;
        }
        
-       /**
-        * Gets the number of bytes currently occupied in this sorter.
-        * 
-        * @return The number of bytes occupied.
-        */
        @Override
        public long getOccupancy() {
                return this.currentDataBufferOffset + this.sortIndexBytes;
        }
+       
+       @Override
+       public long getNumRecordBytes() {
+               return this.currentDataBufferOffset;
+       }
 
        // 
-------------------------------------------------------------------------
        // Retrieving and Writing
@@ -285,22 +282,27 @@ public final class NormalizedKeySorter<T> implements 
InMemorySorter<T> {
                        }
                }
                
-               // add the pointer and the normalized key
-               
this.currentSortIndexSegment.putLong(this.currentSortIndexOffset, 
this.currentDataBufferOffset);
-               if(this.numKeyBytes != 0) {
-                       this.comparator.putNormalizedKey(record, 
this.currentSortIndexSegment, this.currentSortIndexOffset + OFFSET_LEN, 
this.numKeyBytes);
-               }
-               
                // serialize the record into the data buffers
                try {
                        this.serializer.serialize(record, this.recordCollector);
-                       this.currentSortIndexOffset += this.indexEntrySize;
-                       this.currentDataBufferOffset = 
this.recordCollector.getCurrentOffset();
-                       this.numRecords++;
-                       return true;
-               } catch (EOFException eofex) {
+               }
+               catch (EOFException e) {
                        return false;
                }
+               
+               final long newOffset = this.recordCollector.getCurrentOffset();
+               final boolean shortRecord = newOffset - 
this.currentDataBufferOffset < LARGE_RECORD_THRESHOLD;
+               
+               // add the pointer and the normalized key
+               
this.currentSortIndexSegment.putLong(this.currentSortIndexOffset, shortRecord ? 
this.currentDataBufferOffset : -this.currentDataBufferOffset);
+               if (this.numKeyBytes != 0) {
+                       this.comparator.putNormalizedKey(record, 
this.currentSortIndexSegment, this.currentSortIndexOffset + OFFSET_LEN, 
this.numKeyBytes);
+               }
+               
+               this.currentSortIndexOffset += this.indexEntrySize;
+               this.currentDataBufferOffset = newOffset;
+               this.numRecords++;
+               return true;
        }
        
        // 
------------------------------------------------------------------------
@@ -315,7 +317,7 @@ public final class NormalizedKeySorter<T> implements 
InMemorySorter<T> {
                final int bufferNum = logicalPosition / 
this.indexEntriesPerSegment;
                final int segmentOffset = logicalPosition % 
this.indexEntriesPerSegment;
                
-               return this.sortIndex.get(bufferNum).getLong(segmentOffset * 
this.indexEntrySize);
+               return 
Math.abs(this.sortIndex.get(bufferNum).getLong(segmentOffset * 
this.indexEntrySize));
        }
        
        private final T getRecordFromBuffer(T reuse, long pointer) throws 
IOException {
@@ -368,8 +370,8 @@ public final class NormalizedKeySorter<T> implements 
InMemorySorter<T> {
                        return this.useNormKeyUninverted ? val : -val;
                }
                
-               final long pointerI = segI.getLong(segmentOffsetI);
-               final long pointerJ = segJ.getLong(segmentOffsetJ);
+               final long pointerI = Math.abs(segI.getLong(segmentOffsetI));
+               final long pointerJ = Math.abs(segJ.getLong(segmentOffsetJ));
                
                return compareRecords(pointerI, pointerJ);
        }
@@ -422,7 +424,7 @@ public final class NormalizedKeySorter<T> implements 
InMemorySorter<T> {
                                                this.currentIndexSegment = 
sortIndex.get(++this.currentSegment);
                                        }
                                        
-                                       long pointer = 
this.currentIndexSegment.getLong(this.currentOffset);
+                                       long pointer = 
Math.abs(this.currentIndexSegment.getLong(this.currentOffset));
                                        this.currentOffset += indexEntrySize;
                                        
                                        try {
@@ -475,30 +477,34 @@ public final class NormalizedKeySorter<T> implements 
InMemorySorter<T> {
         * @throws IOException Thrown, if an I/O exception occurred writing to 
the output view.
         */
        @Override
-       public void writeToOutput(final ChannelWriterOutputView output) throws 
IOException {
-               int recordsLeft = this.numRecords;
+       public void writeToOutput(ChannelWriterOutputView output) throws 
IOException {
+               writeToOutput(output, null);
+       }
+       
+       @Override
+       public void writeToOutput(ChannelWriterOutputView output, 
LargeRecordHandler<T> largeRecordsOutput)
+                       throws IOException
+       {
+               final int numRecords = this.numRecords;
                int currentMemSeg = 0;
-               while (recordsLeft > 0)
-               {
+               int currentRecord = 0;
+               
+               while (currentRecord < numRecords) {
                        final MemorySegment currentIndexSegment = 
this.sortIndex.get(currentMemSeg++);
-                       int offset = 0;
-                       // check whether we have a full or partially full 
segment
-                       if (recordsLeft >= this.indexEntriesPerSegment) {
-                               // full segment
-                               for (;offset <= this.lastIndexEntryOffset; 
offset += this.indexEntrySize) {
-                                       final long pointer = 
currentIndexSegment.getLong(offset);
+
+                       // go through all records in the memory segment
+                       for (int offset = 0; currentRecord < numRecords && 
offset <= this.lastIndexEntryOffset; currentRecord++, offset += 
this.indexEntrySize) {
+                               final long pointer = 
currentIndexSegment.getLong(offset);
+                               
+                               // small records go into the regular spill 
file, large records into the special code path
+                               if (pointer >= 0) {
                                        
this.recordBuffer.setReadPosition(pointer);
                                        this.serializer.copy(this.recordBuffer, 
output);
-                                       
                                }
-                               recordsLeft -= this.indexEntriesPerSegment;
-                       } else {
-                               // partially filled segment
-                               for (; recordsLeft > 0; recordsLeft--, offset 
+= this.indexEntrySize)
-                               {
-                                       final long pointer = 
currentIndexSegment.getLong(offset);
-                                       
this.recordBuffer.setReadPosition(pointer);
-                                       this.serializer.copy(this.recordBuffer, 
output);
+                               else {
+                                       
this.recordBuffer.setReadPosition(-pointer);
+                                       T record = 
this.serializer.deserialize(this.recordBuffer);
+                                       largeRecordsOutput.addRecord(record);
                                }
                        }
                }
@@ -524,7 +530,7 @@ public final class NormalizedKeySorter<T> implements 
InMemorySorter<T> {
                        if (num >= this.indexEntriesPerSegment && offset == 0) {
                                // full segment
                                for (;offset <= this.lastIndexEntryOffset; 
offset += this.indexEntrySize) {
-                                       final long pointer = 
currentIndexSegment.getLong(offset);
+                                       final long pointer = 
Math.abs(currentIndexSegment.getLong(offset));
                                        
this.recordBuffer.setReadPosition(pointer);
                                        this.serializer.copy(this.recordBuffer, 
output);
                                }
@@ -533,7 +539,7 @@ public final class NormalizedKeySorter<T> implements 
InMemorySorter<T> {
                                // partially filled segment
                                for (; num > 0 && offset <= 
this.lastIndexEntryOffset; num--, offset += this.indexEntrySize)
                                {
-                                       final long pointer = 
currentIndexSegment.getLong(offset);
+                                       final long pointer = 
Math.abs(currentIndexSegment.getLong(offset));
                                        
this.recordBuffer.setReadPosition(pointer);
                                        this.serializer.copy(this.recordBuffer, 
output);
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/28a62d84/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/SortBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/SortBuffer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/SortBuffer.java
deleted file mode 100644
index be8511b..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/SortBuffer.java
+++ /dev/null
@@ -1,510 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.operators.sort;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.RandomAccessInputView;
-import org.apache.flink.runtime.io.disk.SimpleCollectingOutputView;
-import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
-import org.apache.flink.runtime.memorymanager.ListMemorySegmentSource;
-import org.apache.flink.util.MutableObjectIterator;
-
-public class SortBuffer<T> implements InMemorySorter<T> {
-
-       private static final int OFFSET_LEN = 8;
-       
-       private static final int DEFAULT_MAX_NORMALIZED_KEY_LEN = 16;
-       
-       private static final int MAX_NORMALIZED_KEY_LEN_PER_ELEMENT = 8;
-       
-       private static final int MIN_REQUIRED_BUFFERS = 3;
-
-       // 
------------------------------------------------------------------------
-       //                               Members
-       // 
------------------------------------------------------------------------
-
-       private final byte[] swapBuffer;
-       
-       private final TypeSerializer<T> serializer;
-       
-       private final TypeComparator<T> comparator;
-       
-       private final SimpleCollectingOutputView recordCollector;
-       
-       private final RandomAccessInputView recordBuffer;
-       
-       private final RandomAccessInputView recordBufferForComparison;
-       
-       private MemorySegment currentSortIndexSegment;
-       
-       private final ArrayList<MemorySegment> freeMemory;
-       
-       private final ArrayList<MemorySegment> sortIndex;
-       
-       private final ArrayList<MemorySegment> recordBufferSegments;
-       
-       private long currentDataBufferOffset;
-       
-       private long sortIndexBytes;
-       
-       private int currentSortIndexOffset;
-       
-       private int numRecords;
-       
-       private final int numKeyBytes;
-       
-       private final int indexEntrySize;
-       
-       private final int indexEntriesPerSegment;
-       
-       private final int lastIndexEntryOffset;
-       
-       private final int segmentSize;
-       
-       private final int totalNumBuffers;
-       
-       private final boolean normalizedKeyFullyDetermines;
-       
-       private final boolean useNormKeyUninverted;
-       
-       
-       // 
-------------------------------------------------------------------------
-       // Constructors / Destructors
-       // 
-------------------------------------------------------------------------
-
-       public SortBuffer(TypeSerializer<T> serializer, TypeComparator<T> 
comparator, List<MemorySegment> memory) {
-               this(serializer, comparator, memory, 
DEFAULT_MAX_NORMALIZED_KEY_LEN);
-       }
-       
-       public SortBuffer(TypeSerializer<T> serializer, TypeComparator<T> 
comparator, 
-                       List<MemorySegment> memory, int maxNormalizedKeyBytes)
-       {
-               if (serializer == null || comparator == null || memory == null) 
{
-                       throw new NullPointerException();
-               }
-               if (maxNormalizedKeyBytes < 0) {
-                       throw new IllegalArgumentException("Maximal number of 
normalized key bytes must not be negative.");
-               }
-               
-               this.serializer = serializer;
-               this.comparator = comparator;
-               this.useNormKeyUninverted = !comparator.invertNormalizedKey();
-               
-               // check the size of the first buffer and record it. all 
further buffers must have the same size.
-               // the size must also be a power of 2
-               this.totalNumBuffers = memory.size();
-               if (this.totalNumBuffers < MIN_REQUIRED_BUFFERS) {
-                       throw new IllegalArgumentException("Normalized-Key 
sorter requires at least " + MIN_REQUIRED_BUFFERS + " memory buffers.");
-               }
-               this.segmentSize = memory.get(0).size();
-               
-               if (memory instanceof ArrayList<?>) {
-                       this.freeMemory = (ArrayList<MemorySegment>) memory;
-               }
-               else {
-                       this.freeMemory = new 
ArrayList<MemorySegment>(memory.size());
-                       this.freeMemory.addAll(memory);
-               }
-               
-               // create the buffer collections
-               this.sortIndex = new ArrayList<MemorySegment>(16);
-               this.recordBufferSegments = new ArrayList<MemorySegment>(16);
-               
-               // the views for the record collections
-               this.recordCollector = new 
SimpleCollectingOutputView(this.recordBufferSegments,
-                       new ListMemorySegmentSource(this.freeMemory), 
this.segmentSize);
-               this.recordBuffer = new 
RandomAccessInputView(this.recordBufferSegments, this.segmentSize);
-               this.recordBufferForComparison = new 
RandomAccessInputView(this.recordBufferSegments, this.segmentSize);
-               
-               // set up normalized key characteristics
-               if (this.comparator.supportsNormalizedKey()) {
-                       // compute the max normalized key length
-                       int numPartialKeys;
-                       try {
-                               numPartialKeys = 
this.comparator.getFlatComparators().length;
-                       } catch (Throwable t) {
-                               numPartialKeys = 1;
-                       }
-                       
-                       int maxLen = Math.min(maxNormalizedKeyBytes, 
MAX_NORMALIZED_KEY_LEN_PER_ELEMENT * numPartialKeys);
-                       
-                       this.numKeyBytes = 
Math.min(this.comparator.getNormalizeKeyLen(), maxLen);
-                       this.normalizedKeyFullyDetermines = 
!this.comparator.isNormalizedKeyPrefixOnly(this.numKeyBytes);
-               }
-               else {
-                       this.numKeyBytes = 0;
-                       this.normalizedKeyFullyDetermines = false;
-               }
-               
-               // compute the index entry size and limits
-               this.indexEntrySize = this.numKeyBytes + OFFSET_LEN;
-               this.indexEntriesPerSegment = segmentSize / this.indexEntrySize;
-               this.lastIndexEntryOffset = (this.indexEntriesPerSegment - 1) * 
this.indexEntrySize;
-               this.swapBuffer = new byte[this.indexEntrySize];
-               
-               // set to initial state
-               this.currentSortIndexSegment = nextMemorySegment();
-               this.sortIndex.add(this.currentSortIndexSegment);
-       }
-
-       // 
-------------------------------------------------------------------------
-       // Memory Segment
-       // 
-------------------------------------------------------------------------
-
-       /**
-        * Resets the sort buffer back to the state where it is empty. All 
contained data is discarded.
-        */
-       @Override
-       public void reset() {
-               // reset all offsets
-               this.numRecords = 0;
-               this.currentSortIndexOffset = 0;
-               this.currentDataBufferOffset = 0;
-               this.sortIndexBytes = 0;
-               
-               // return all memory
-               this.freeMemory.addAll(this.sortIndex);
-               this.freeMemory.addAll(this.recordBufferSegments);
-               this.sortIndex.clear();
-               this.recordBufferSegments.clear();
-               
-               // grab first buffers
-               this.currentSortIndexSegment = nextMemorySegment();
-               this.sortIndex.add(this.currentSortIndexSegment);
-               this.recordCollector.reset();
-       }
-
-       /**
-        * Checks whether the buffer is empty.
-        * 
-        * @return True, if no record is contained, false otherwise.
-        */
-       @Override
-       public boolean isEmpty() {
-               return this.numRecords == 0;
-       }
-       
-       /**
-        * Collects all memory segments from this sorter.
-        * 
-        * @return All memory segments from this sorter.
-        */
-       @Override
-       public List<MemorySegment> dispose() {
-               this.freeMemory.addAll(this.sortIndex);
-               this.freeMemory.addAll(this.recordBufferSegments);
-               
-               this.recordBufferSegments.clear();
-               this.sortIndex.clear();
-               
-               return this.freeMemory;
-       }
-       
-       /**
-        * Gets the total capacity of this sorter, in bytes.
-        * 
-        * @return The sorter's total capacity.
-        */
-       @Override
-       public long getCapacity() {
-               return ((long) this.totalNumBuffers) * this.segmentSize;
-       }
-       
-       /**
-        * Gets the number of bytes currently occupied in this sorter.
-        * 
-        * @return The number of bytes occupied.
-        */
-       @Override
-       public long getOccupancy() {
-               return this.currentDataBufferOffset + this.sortIndexBytes;
-       }
-
-       // 
-------------------------------------------------------------------------
-       // Retrieving and Writing
-       // 
-------------------------------------------------------------------------
-
-       /**
-        * Gets the record at the given logical position.
-        * 
-        * @param reuse The target object to deserialize the record into.
-        * @param logicalPosition The logical position of the record.
-        * @throws IOException Thrown, if an exception occurred during 
deserialization.
-        */
-       @Override
-       public T getRecord(T reuse, int logicalPosition) throws IOException {
-               return getRecordFromBuffer(reuse, readPointer(logicalPosition));
-       }
-
-       /**
-        * Writes a given record to this sort buffer. The written record will 
be appended and take
-        * the last logical position.
-        * 
-        * @param record The record to be written.
-        * @return True, if the record was successfully written, false, if the 
sort buffer was full.
-        * @throws IOException Thrown, if an error occurred while serializing 
the record into the buffers.
-        */
-       @Override
-       public boolean write(T record) throws IOException {
-               //check whether we need a new memory segment for the sort index
-               if (this.currentSortIndexOffset > this.lastIndexEntryOffset) {
-                       if (memoryAvailable()) {
-                               this.currentSortIndexSegment = 
nextMemorySegment();
-                               
this.sortIndex.add(this.currentSortIndexSegment);
-                               this.currentSortIndexOffset = 0;
-                               this.sortIndexBytes += this.segmentSize;
-                       } else {
-                               return false;
-                       }
-               }
-               
-               // add the pointer and the normalized key
-               
this.currentSortIndexSegment.putLong(this.currentSortIndexOffset, 
this.currentDataBufferOffset);
-               if(this.numKeyBytes != 0) {
-                       this.comparator.putNormalizedKey(record, 
this.currentSortIndexSegment, this.currentSortIndexOffset + OFFSET_LEN, 
this.numKeyBytes);
-               }
-               
-               // serialize the record into the data buffers
-               try {
-                       this.serializer.serialize(record, this.recordCollector);
-                       this.currentSortIndexOffset += this.indexEntrySize;
-                       this.currentDataBufferOffset = 
this.recordCollector.getCurrentOffset();
-                       this.numRecords++;
-                       return true;
-               } catch (EOFException eofex) {
-                       return false;
-               }
-       }
-       
-       // 
------------------------------------------------------------------------
-       //                           Access Utilities
-       // 
------------------------------------------------------------------------
-       
-       private final long readPointer(int logicalPosition) {
-               if (logicalPosition < 0 | logicalPosition >= this.numRecords) {
-                       throw new IndexOutOfBoundsException();
-               }
-               
-               final int bufferNum = logicalPosition / 
this.indexEntriesPerSegment;
-               final int segmentOffset = logicalPosition % 
this.indexEntriesPerSegment;
-               
-               return this.sortIndex.get(bufferNum).getLong(segmentOffset * 
this.indexEntrySize);
-       }
-       
-       private final T getRecordFromBuffer(T reuse, long pointer) throws 
IOException {
-               this.recordBuffer.setReadPosition(pointer);
-               return this.serializer.deserialize(reuse, this.recordBuffer);
-       }
-       
-       private final int compareRecords(long pointer1, long pointer2) {
-               this.recordBuffer.setReadPosition(pointer1);
-               this.recordBufferForComparison.setReadPosition(pointer2);
-               
-               try {
-                       return 
this.comparator.compareSerialized(this.recordBuffer, 
this.recordBufferForComparison);
-               } catch (IOException ioex) {
-                       throw new RuntimeException("Error comparing two 
records.", ioex);
-               }
-       }
-       
-       private final boolean memoryAvailable() {
-               return !this.freeMemory.isEmpty();
-       }
-       
-       private final MemorySegment nextMemorySegment() {
-               return this.freeMemory.remove(this.freeMemory.size() - 1);
-       }
-
-       // 
-------------------------------------------------------------------------
-       // Indexed Sorting
-       // 
-------------------------------------------------------------------------
-
-       @Override
-       public int compare(int i, int j) {
-               final int bufferNumI = i / this.indexEntriesPerSegment;
-               final int segmentOffsetI = (i % this.indexEntriesPerSegment) * 
this.indexEntrySize;
-               
-               final int bufferNumJ = j / this.indexEntriesPerSegment;
-               final int segmentOffsetJ = (j % this.indexEntriesPerSegment) * 
this.indexEntrySize;
-               
-               final MemorySegment segI = this.sortIndex.get(bufferNumI);
-               final MemorySegment segJ = this.sortIndex.get(bufferNumJ);
-               
-               int val = MemorySegment.compare(segI, segJ, segmentOffsetI + 
OFFSET_LEN, segmentOffsetJ + OFFSET_LEN, this.numKeyBytes);
-               
-               if (val != 0 || this.normalizedKeyFullyDetermines) {
-                       return this.useNormKeyUninverted ? val : -val;
-               }
-               
-               final long pointerI = segI.getLong(segmentOffsetI);
-               final long pointerJ = segJ.getLong(segmentOffsetJ);
-               
-               return compareRecords(pointerI, pointerJ);
-       }
-
-       @Override
-       public void swap(int i, int j) {
-               final int bufferNumI = i / this.indexEntriesPerSegment;
-               final int segmentOffsetI = (i % this.indexEntriesPerSegment) * 
this.indexEntrySize;
-               
-               final int bufferNumJ = j / this.indexEntriesPerSegment;
-               final int segmentOffsetJ = (j % this.indexEntriesPerSegment) * 
this.indexEntrySize;
-               
-               final MemorySegment segI = this.sortIndex.get(bufferNumI);
-               final MemorySegment segJ = this.sortIndex.get(bufferNumJ);
-               
-               MemorySegment.swapBytes(segI, segJ, this.swapBuffer, 
segmentOffsetI, segmentOffsetJ, this.indexEntrySize);
-       }
-
-       @Override
-       public int size() {
-               return this.numRecords;
-       }
-
-       // 
-------------------------------------------------------------------------
-       
-       /**
-        * Gets an iterator over all records in this buffer in their logical 
order.
-        * 
-        * @return An iterator returning the records in their logical order.
-        */
-       @Override
-       public final MutableObjectIterator<T> getIterator() {
-               return new MutableObjectIterator<T>()
-               {
-                       private final int size = size();
-                       private int current = 0;
-                       
-                       private int currentSegment = 0;
-                       private int currentOffset = 0;
-                       
-                       private MemorySegment currentIndexSegment = 
sortIndex.get(0);
-
-                       @Override
-                       public T next(T target)
-                       {
-                               if (this.current < this.size) {
-                                       this.current++;
-                                       if (this.currentOffset > 
lastIndexEntryOffset) {
-                                               this.currentOffset = 0;
-                                               this.currentIndexSegment = 
sortIndex.get(++this.currentSegment);
-                                       }
-                                       
-                                       long pointer = 
this.currentIndexSegment.getLong(this.currentOffset);
-                                       this.currentOffset += indexEntrySize;
-                                       
-                                       try {
-                                               return 
getRecordFromBuffer(target, pointer);
-                                       }
-                                       catch (IOException ioe) {
-                                               throw new RuntimeException(ioe);
-                                       }
-                               }
-                               else {
-                                       return null;
-                               }
-                       }
-               };
-       }
-       
-       // 
------------------------------------------------------------------------
-       //                Writing to a DataOutputView
-       // 
------------------------------------------------------------------------
-       
-       /**
-        * Writes the records in this buffer in their logical order to the 
given output.
-        * 
-        * @param output The output view to write the records to.
-        * @throws IOException Thrown, if an I/O exception occurred writing to 
the output view.
-        */
-       @Override
-       public void writeToOutput(final ChannelWriterOutputView output) throws 
IOException {
-               int recordsLeft = this.numRecords;
-               int currentMemSeg = 0;
-               while (recordsLeft > 0)
-               {
-                       final MemorySegment currentIndexSegment = 
this.sortIndex.get(currentMemSeg++);
-                       int offset = 0;
-                       // check whether we have a full or partially full 
segment
-                       if (recordsLeft >= this.indexEntriesPerSegment) {
-                               // full segment
-                               for (;offset <= this.lastIndexEntryOffset; 
offset += this.indexEntrySize) {
-                                       final long pointer = 
currentIndexSegment.getLong(offset);
-                                       
this.recordBuffer.setReadPosition(pointer);
-                                       this.serializer.copy(this.recordBuffer, 
output);
-                                       
-                               }
-                               recordsLeft -= this.indexEntriesPerSegment;
-                       } else {
-                               // partially filled segment
-                               for (; recordsLeft > 0; recordsLeft--, offset 
+= this.indexEntrySize)
-                               {
-                                       final long pointer = 
currentIndexSegment.getLong(offset);
-                                       
this.recordBuffer.setReadPosition(pointer);
-                                       this.serializer.copy(this.recordBuffer, 
output);
-                               }
-                       }
-               }
-       }
-       
-       /**
-        * Writes a subset of the records in this buffer in their logical order 
to the given output.
-        * 
-        * @param output The output view to write the records to.
-        * @param start The logical start position of the subset.
-        * @param num The number of elements to write.
-        * @throws IOException Thrown, if an I/O exception occurred writing to 
the output view.
-        */
-       @Override
-       public void writeToOutput(final ChannelWriterOutputView output, final 
int start, int num) throws IOException {
-               int currentMemSeg = start / this.indexEntriesPerSegment;
-               int offset = (start % this.indexEntriesPerSegment) * 
this.indexEntrySize;
-               
-               while (num > 0)
-               {
-                       final MemorySegment currentIndexSegment = 
this.sortIndex.get(currentMemSeg++);
-                       // check whether we have a full or partially full 
segment
-                       if (num >= this.indexEntriesPerSegment && offset == 0) {
-                               // full segment
-                               for (;offset <= this.lastIndexEntryOffset; 
offset += this.indexEntrySize) {
-                                       final long pointer = 
currentIndexSegment.getLong(offset);
-                                       
this.recordBuffer.setReadPosition(pointer);
-                                       this.serializer.copy(this.recordBuffer, 
output);
-                               }
-                               num -= this.indexEntriesPerSegment;
-                       } else {
-                               // partially filled segment
-                               for (; num > 0 && offset <= 
this.lastIndexEntryOffset; num--, offset += this.indexEntrySize)
-                               {
-                                       final long pointer = 
currentIndexSegment.getLong(offset);
-                                       
this.recordBuffer.setReadPosition(pointer);
-                                       this.serializer.copy(this.recordBuffer, 
output);
-                               }
-                       }
-                       offset = 0;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/28a62d84/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
index 51cc1cf..a534399 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
@@ -49,7 +49,6 @@ import 
org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
 import org.apache.flink.runtime.util.EmptyMutableObjectIterator;
-import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
 /**
@@ -78,7 +77,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
        
        /** The minimum number of segments that are required for the sort to 
operate. */
        protected static final int MIN_NUM_SORT_MEM_SEGMENTS = 10;
-
+       
        // 
------------------------------------------------------------------------
        //                                  Threads
        // 
------------------------------------------------------------------------
@@ -670,12 +669,13 @@ public class UnilateralSortMerger<E> implements Sorter<E> 
{
         * Class representing buffers that circulate between the reading, 
sorting and spilling thread.
         */
        protected static final class CircularElement<E> {
+               
                final int id;
                final InMemorySorter<E> buffer;
 
                public CircularElement() {
-                       this.buffer = null;
                        this.id = -1;
+                       this.buffer = null;
                }
 
                public CircularElement(int id, InMemorySorter<E> buffer) {
@@ -770,8 +770,6 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
                                internalHandleException(new IOException("Thread 
'" + getName() + "' terminated due to an exception: "
                                        + t.getMessage(), t));
                        }
-                       finally {
-                       }
                }
 
                /**
@@ -915,6 +913,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
                                // write the last leftover pair, if we have one
                                if (leftoverRecord != null) {
                                        if (!buffer.write(leftoverRecord)) {
+                                               // did not fit in a fresh 
buffer, must be large...
                                                if (this.largeRecords != null) {
                                                        
this.largeRecords.addRecord(leftoverRecord);
                                                } else {
@@ -923,6 +922,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
                                                }
                                                buffer.reset();
                                        }
+                                       
                                        leftoverRecord = null;
                                }
                                
@@ -944,6 +944,9 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
                                                        fullBuffer = true;
                                                        break;
                                                }
+                                               
+                                               // successfully added record
+                                               
                                                if (bytesUntilSpilling - 
buffer.getOccupancy() <= 0) {
                                                        bytesUntilSpilling = 0;
                                                        
@@ -1303,7 +1306,9 @@ public class UnilateralSortMerger<E> implements Sorter<E> 
{
                                if (LOG.isDebugEnabled()) {
                                        LOG.debug("Spilling buffer " + 
element.id + ".");
                                }
-                               element.buffer.writeToOutput(output);
+                               
+                               element.buffer.writeToOutput(output, 
largeRecordHandler);
+                               
                                if (LOG.isDebugEnabled()) {
                                        LOG.debug("Spilled buffer " + 
element.id + ".");
                                }
@@ -1655,166 +1660,8 @@ public class UnilateralSortMerger<E> implements 
Sorter<E> {
                }
        }
        
-       /**
-        *
-        */
-       public static final class InputDataCollector<E> implements Collector<E>
-       {
-               private final CircularQueues<E> queues;         // the queues 
used to pass buffers
-               
-               private InMemorySorter<E> currentBuffer;
-               
-               private CircularElement<E> currentElement;
-               
-               private long bytesUntilSpilling;                        // 
number of bytes left before we signal to spill
-               
-               private boolean spillingInThisBuffer;
-               
-               private volatile boolean running;
-               
-
-               public InputDataCollector(CircularQueues<E> queues, long 
startSpillingBytes)
-               {
-                       this.queues = queues;
-                       this.bytesUntilSpilling = startSpillingBytes;
-                       this.running = true;
-                       
-                       grabBuffer();
-               }
+       protected static final class ChannelWithBlockCount {
                
-               private void grabBuffer()
-               {
-                       while (this.currentElement == null) {
-                               try {
-                                       this.currentElement = 
this.queues.empty.take();
-                               }
-                               catch (InterruptedException iex) {
-                                       if (this.running) {
-                                               LOG.error("Reading thread was 
interrupted (without being shut down) while grabbing a buffer. " +
-                                                               "Retrying to 
grab buffer...");
-                                       } else {
-                                               return;
-                                       }
-                               }
-                       }
-                       
-                       this.currentBuffer = this.currentElement.buffer;
-                       if (!this.currentBuffer.isEmpty()) {
-                               throw new RuntimeException("New sort-buffer is 
not empty.");
-                       }
-                       
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("Retrieved empty read buffer " + 
this.currentElement.id + ".");
-                       }
-                       
-                       this.spillingInThisBuffer = 
this.currentBuffer.getCapacity() <= this.bytesUntilSpilling;
-               }
-               
-
-               @Override
-               public void collect(E record)
-               {
-                       try {
-                               if (this.spillingInThisBuffer) {
-                                       if (this.currentBuffer.write(record)) {
-                                               if (this.bytesUntilSpilling - 
this.currentBuffer.getOccupancy() <= 0) {
-                                                       this.bytesUntilSpilling 
= 0;
-                                                       // send the sentinel
-                                                       
this.queues.sort.add(UnilateralSortMerger.<E>spillingMarker());
-                                               }
-                                               return;
-                                       }
-                               }
-                               else {
-                                       // no spilling in this buffer
-                                       if (this.currentBuffer.write(record)) {
-                                               return;
-                                       }
-                               }
-                               
-                               if (this.bytesUntilSpilling > 0) {
-                                       this.bytesUntilSpilling -= 
this.currentBuffer.getCapacity();
-                                       if (this.bytesUntilSpilling <= 0) {
-                                               this.bytesUntilSpilling = 0;
-                                               // send the sentinel
-                                               
this.queues.sort.add(UnilateralSortMerger.<E>spillingMarker());
-                                       }
-                               }
-                               
-                               // we came here when the buffer could not be 
written. send it to the sorter
-                               // send the buffer
-                               if (LOG.isDebugEnabled()) {
-                                       LOG.debug("Emitting full buffer from 
reader thread: " + this.currentElement.id + ".");
-                               }
-                               this.queues.sort.add(this.currentElement);
-                               this.currentElement = null;
-                               
-                               // we need a new buffer. grab the next one
-                               while (this.running && this.currentElement == 
null) {
-                                       try {
-                                               this.currentElement = 
this.queues.empty.take();
-                                       }
-                                       catch (InterruptedException iex) {
-                                               if (this.running) {
-                                                       LOG.error("Reading 
thread was interrupted (without being shut down) while grabbing a buffer. " +
-                                                                       
"Retrying to grab buffer...");
-                                               } else {
-                                                       return;
-                                               }
-                                       }
-                               }
-                               if (!this.running) {
-                                       return;
-                               }
-                               
-                               this.currentBuffer = this.currentElement.buffer;
-                               if (!this.currentBuffer.isEmpty()) {
-                                       throw new RuntimeException("BUG: New 
sort-buffer is not empty.");
-                               }
-                               
-                               if (LOG.isDebugEnabled()) {
-                                       LOG.debug("Retrieved empty read buffer 
" + this.currentElement.id + ".");
-                               }
-                               // write the record
-                               if (!this.currentBuffer.write(record)) {
-                                       throw new RuntimeException("Record 
could not be written to empty sort-buffer: Serialized record exceeds buffer 
capacity.");
-                               }
-                       }
-                       catch (IOException ioex) {
-                               throw new RuntimeException("BUG: An error 
occurred while writing a record to the sort buffer: " + 
-                                               ioex.getMessage(), ioex);
-                       }
-               }
-               
-
-               @Override
-               public void close()
-               {
-                       if (this.running) {
-                               this.running = false;
-                               
-                               if (this.currentBuffer != null && 
this.currentElement != null) {
-                                       if (this.currentBuffer.isEmpty()) {
-                                               
this.queues.empty.add(this.currentElement);
-                                       }
-                                       else {
-                                               
this.queues.sort.add(this.currentElement);
-                                               if (LOG.isDebugEnabled()) {
-                                                       LOG.debug("Emitting 
last buffer from input collector: " + this.currentElement.id + ".");
-                                               }
-                                       }
-                               }
-                               
-                               this.currentBuffer = null;
-                               this.currentElement = null;
-                               
-                               
this.queues.sort.add(UnilateralSortMerger.<E>endMarker());
-                       }
-               }
-       }
-       
-       protected static final class ChannelWithBlockCount
-       {
                private final FileIOChannel.ID channel;
                private final int blockCount;
                

http://git-wip-us.apache.org/repos/asf/flink/blob/28a62d84/flink-runtime/src/main/java/org/apache/flink/runtime/util/IntArrayList.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/IntArrayList.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/IntArrayList.java
index 6782b91..999c4b0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/IntArrayList.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/IntArrayList.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.util;
 
+import java.util.NoSuchElementException;
+
 /**
  * Minimal implementation of an array-backed list of ints
  */
@@ -42,16 +44,12 @@ public class IntArrayList {
                return true;
        }
 
-       public int removeInt(int index) {
-               if(index >= size) {
-                       throw new IndexOutOfBoundsException("Index (" + index + 
") is greater than or equal to list size (" + size + ")");
-               }
-               final int old = array[ index ];
-               size--;
-               if(index != size) {
-                       System.arraycopy(array, index+1, array, index, 
size-index );
+       public int removeLast() {
+               if (size == 0) {
+                       throw new NoSuchElementException();
                }
-               return old;
+               --size;
+               return array[size];
        }
        
        public void clear() {
@@ -59,7 +57,7 @@ public class IntArrayList {
        }
        
        public boolean isEmpty() {
-               return (size==0);
+               return size == 0;
        }
        
        private void grow(final int length) {
@@ -71,4 +69,14 @@ public class IntArrayList {
                }
        }
 
+       public static final IntArrayList EMPTY = new IntArrayList(0) {
+               
+               public boolean add(int number) {
+                       throw new UnsupportedOperationException();
+               }
+               
+               public int removeLast() {
+                       throw new UnsupportedOperationException();
+               };
+       };
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/28a62d84/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
index 914359d..6e35ad0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
@@ -18,29 +18,12 @@
 
 package org.apache.flink.runtime.operators.sort;
 
-import static org.junit.Assert.*;
-
-import org.junit.Assert;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
 import java.util.Comparator;
-import java.util.Random;
 
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.api.common.typeutils.record.RecordComparator;
 import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.ValueTypeInfo;
-import 
org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -49,18 +32,21 @@ import org.apache.flink.runtime.memorymanager.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.operators.testutils.RandomIntPairGenerator;
 import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Key;
-import org.apache.flink.runtime.operators.testutils.TestData.Value;
 import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
 import 
org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
+import org.apache.flink.runtime.operators.testutils.TestData.Key;
+import org.apache.flink.runtime.operators.testutils.TestData.Value;
 import org.apache.flink.runtime.operators.testutils.types.IntPair;
 import org.apache.flink.runtime.operators.testutils.types.IntPairComparator;
 import org.apache.flink.runtime.operators.testutils.types.IntPairSerializer;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.MutableObjectIterator;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class ExternalSortITCase {
@@ -77,7 +63,7 @@ public class ExternalSortITCase {
 
        private static final int NUM_PAIRS = 200000;
 
-       private static final int MEMORY_SIZE = 1024 * 1024 * 78;
+       private static final int MEMORY_SIZE = 1024 * 1024 * 78;        
        
        private final AbstractInvokable parentTask = new DummyInvokable();
 
@@ -256,7 +242,7 @@ public class ExternalSortITCase {
                merger.close();
        }
 
-//     @Test
+       @Test
        public void testSpillingSortWithIntermediateMerge() throws Exception {
                // amount of pairs
                final int PAIRS = 10000000;
@@ -310,7 +296,7 @@ public class ExternalSortITCase {
                merger.close();
        }
        
-//     @Test
+       @Test
        public void testSpillingSortWithIntermediateMergeIntPair() throws 
Exception {
                // amount of pairs
                final int PAIRS = 50000000;
@@ -361,214 +347,4 @@ public class ExternalSortITCase {
                Assert.assertEquals("Not all pairs were read back in.", PAIRS, 
pairsRead);
                merger.close();
        }
-       
-       @Test
-       public void testSortWithLongRecordsOnly() {
-               try {
-                       final int NUM_RECORDS = 10;
-                       
-                       final TypeInformation<?>[] types = new 
TypeInformation<?>[] {
-                                       BasicTypeInfo.LONG_TYPE_INFO,
-                                       new 
ValueTypeInfo<SomeMaybeLongValue>(SomeMaybeLongValue.class)
-                               };
-                       
-                       final TupleTypeInfo<Tuple2<Long, SomeMaybeLongValue>> 
typeInfo = 
-                                                               new 
TupleTypeInfo<Tuple2<Long,SomeMaybeLongValue>>(types);
-                       final TypeSerializer<Tuple2<Long, SomeMaybeLongValue>> 
serializer = typeInfo.createSerializer();
-                       final TypeComparator<Tuple2<Long, SomeMaybeLongValue>> 
comparator = typeInfo.createComparator(new int[] {0}, new boolean[]{false}, 0);
-                       
-                       MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> 
source = 
-                                       new MutableObjectIterator<Tuple2<Long, 
SomeMaybeLongValue>>()
-                       {
-                               private final Random rnd = new Random();
-                               private int num = 0;
-                               
-                               @Override
-                               public Tuple2<Long, SomeMaybeLongValue> 
next(Tuple2<Long, SomeMaybeLongValue> reuse) {
-                                       if (num++ < NUM_RECORDS) {
-                                               long val = rnd.nextLong();
-                                               return new Tuple2<Long, 
SomeMaybeLongValue>(val, new SomeMaybeLongValue((int) val));
-                                       }
-                                       else {
-                                               return null;
-                                       }
-                                       
-                               }
-                       };
-                       
-                       @SuppressWarnings("unchecked")
-                       Sorter<Tuple2<Long, SomeMaybeLongValue>> sorter = new 
UnilateralSortMerger<Tuple2<Long, SomeMaybeLongValue>>(
-                                       this.memoryManager, this.ioManager, 
-                                       source, this.parentTask,
-                                       new 
RuntimeStatefulSerializerFactory<Tuple2<Long, SomeMaybeLongValue>>(serializer, 
(Class<Tuple2<Long, SomeMaybeLongValue>>) (Class<?>) Tuple2.class),
-                                       comparator, 1.0, 1, 128, 0.7f);
-                       
-                       // check order
-                       MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> 
iterator = sorter.getIterator();
-                       
-                       Tuple2<Long, SomeMaybeLongValue> val = 
serializer.createInstance();
-                       
-                       long prevKey = Long.MAX_VALUE;
-
-                       for (int i = 0; i < NUM_RECORDS; i++) {
-                               val = iterator.next(val);
-                               
-                               assertTrue(val.f0 <= prevKey);
-                               assertTrue(val.f0.intValue() == val.f1.val());
-                       }
-                       
-                       assertNull(iterator.next(val));
-                       
-                       sorter.close();
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testSortWithLongAndShortRecordsMixed() {
-               try {
-                       final int NUM_RECORDS = 1000000;
-                       final int LARGE_REC_INTERVAL = 100000;
-                       
-                       final TypeInformation<?>[] types = new 
TypeInformation<?>[] {
-                                       BasicTypeInfo.LONG_TYPE_INFO,
-                                       new 
ValueTypeInfo<SomeMaybeLongValue>(SomeMaybeLongValue.class)
-                               };
-                       
-                       final TupleTypeInfo<Tuple2<Long, SomeMaybeLongValue>> 
typeInfo = 
-                                                               new 
TupleTypeInfo<Tuple2<Long,SomeMaybeLongValue>>(types);
-                       final TypeSerializer<Tuple2<Long, SomeMaybeLongValue>> 
serializer = typeInfo.createSerializer();
-                       final TypeComparator<Tuple2<Long, SomeMaybeLongValue>> 
comparator = typeInfo.createComparator(new int[] {0}, new boolean[]{false}, 0);
-                       
-                       MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> 
source = 
-                                       new MutableObjectIterator<Tuple2<Long, 
SomeMaybeLongValue>>()
-                       {
-                               private final Random rnd = new Random();
-                               private int num = -1;
-                               
-                               @Override
-                               public Tuple2<Long, SomeMaybeLongValue> 
next(Tuple2<Long, SomeMaybeLongValue> reuse) {
-                                       if (++num < NUM_RECORDS) {
-                                               long val = rnd.nextLong();
-                                               return new Tuple2<Long, 
SomeMaybeLongValue>(val, new SomeMaybeLongValue((int) val, num % 
LARGE_REC_INTERVAL == 0));
-                                       }
-                                       else {
-                                               return null;
-                                       }
-                                       
-                               }
-                       };
-                       
-                       @SuppressWarnings("unchecked")
-                       Sorter<Tuple2<Long, SomeMaybeLongValue>> sorter = new 
UnilateralSortMerger<Tuple2<Long, SomeMaybeLongValue>>(
-                                       this.memoryManager, this.ioManager, 
-                                       source, this.parentTask,
-                                       new 
RuntimeStatefulSerializerFactory<Tuple2<Long, SomeMaybeLongValue>>(serializer, 
(Class<Tuple2<Long, SomeMaybeLongValue>>) (Class<?>) Tuple2.class),
-                                       comparator, 1.0, 1, 128, 0.7f);
-                       
-                       // check order
-                       MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> 
iterator = sorter.getIterator();
-                       
-                       Tuple2<Long, SomeMaybeLongValue> val = 
serializer.createInstance();
-                       
-                       long prevKey = Long.MAX_VALUE;
-
-                       for (int i = 0; i < NUM_RECORDS; i++) {
-                               val = iterator.next(val);
-                               
-                               assertTrue(val.f0 <= prevKey);
-                               assertTrue(val.f0.intValue() == val.f1.val());
-                       }
-                       
-                       assertNull(iterator.next(val));
-                       
-                       sorter.close();
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       public static final class SomeMaybeLongValue implements 
org.apache.flink.types.Value {
-               
-               private static final long serialVersionUID = 1L;
-
-               private static final byte[] BUFFER = new byte[100000000];
-               
-               static {
-                       for (int i = 0; i < BUFFER.length; i++) {
-                               BUFFER[i] = (byte) i;
-                       }
-               }
-               
-               private int val;
-               
-               private boolean isLong;
-               
-
-               public SomeMaybeLongValue() {
-                       this.isLong = true;
-               }
-               
-               public SomeMaybeLongValue(int val) {
-                       this.val = val;
-                       this.isLong = true;
-               }
-               
-               public SomeMaybeLongValue(int val, boolean isLong) {
-                       this.val = val;
-                       this.isLong = isLong;
-               }
-               
-               public int val() {
-                       return val;
-               }
-               
-               public boolean isLong() {
-                       return isLong;
-               }
-               
-               @Override
-               public void read(DataInputView in) throws IOException {
-                       val = in.readInt();
-                       isLong = in.readBoolean();
-                       
-                       if (isLong) {
-                               for (int i = 0; i < BUFFER.length; i++) {
-                                       byte b = in.readByte();
-                                       assertEquals(BUFFER[i], b);
-                               }
-                       }
-               }
-               
-               @Override
-               public void write(DataOutputView out) throws IOException {
-                       out.writeInt(val);
-                       out.writeBoolean(isLong);
-                       if (isLong) {
-                               out.write(BUFFER);
-                       }
-               }
-               
-               @Override
-               public int hashCode() {
-                       return val;
-               }
-               
-               @Override
-               public boolean equals(Object obj) {
-                       return (obj instanceof SomeMaybeLongValue) && 
((SomeMaybeLongValue) obj).val == this.val;
-               }
-               
-               @Override
-               public String toString() {
-                       return isLong ? "Large Value" : "Small Value";
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/28a62d84/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
new file mode 100644
index 0000000..33d15ae
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import 
org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class ExternalSortLargeRecordsITCase {
+
+       private static final int MEMORY_SIZE = 1024 * 1024 * 78;        
+       
+       private final AbstractInvokable parentTask = new DummyInvokable();
+
+       private IOManager ioManager;
+
+       private MemoryManager memoryManager;
+
+       // 
--------------------------------------------------------------------------------------------
+
+       @Before
+       public void beforeTest() {
+               this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
+               this.ioManager = new IOManagerAsync();
+       }
+
+       @After
+       public void afterTest() {
+               this.ioManager.shutdown();
+               if (!this.ioManager.isProperlyShutDown()) {
+                       Assert.fail("I/O Manager was not properly shut down.");
+               }
+               
+               if (this.memoryManager != null) {
+                       Assert.assertTrue("Memory leak: not all segments have 
been returned to the memory manager.", 
+                               this.memoryManager.verifyEmpty());
+                       this.memoryManager.shutdown();
+                       this.memoryManager = null;
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       
+       @Test
+       public void testSortWithLongRecordsOnly() {
+               try {
+                       final int NUM_RECORDS = 10;
+                       
+                       final TypeInformation<?>[] types = new 
TypeInformation<?>[] {
+                                       BasicTypeInfo.LONG_TYPE_INFO,
+                                       new 
ValueTypeInfo<SomeMaybeLongValue>(SomeMaybeLongValue.class)
+                               };
+                       
+                       final TupleTypeInfo<Tuple2<Long, SomeMaybeLongValue>> 
typeInfo = 
+                                                               new 
TupleTypeInfo<Tuple2<Long,SomeMaybeLongValue>>(types);
+                       final TypeSerializer<Tuple2<Long, SomeMaybeLongValue>> 
serializer = typeInfo.createSerializer();
+                       final TypeComparator<Tuple2<Long, SomeMaybeLongValue>> 
comparator = typeInfo.createComparator(new int[] {0}, new boolean[]{false}, 0);
+                       
+                       MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> 
source = 
+                                       new MutableObjectIterator<Tuple2<Long, 
SomeMaybeLongValue>>()
+                       {
+                               private final Random rnd = new Random();
+                               private int num = 0;
+                               
+                               @Override
+                               public Tuple2<Long, SomeMaybeLongValue> 
next(Tuple2<Long, SomeMaybeLongValue> reuse) {
+                                       if (num++ < NUM_RECORDS) {
+                                               long val = rnd.nextLong();
+                                               return new Tuple2<Long, 
SomeMaybeLongValue>(val, new SomeMaybeLongValue((int) val));
+                                       }
+                                       else {
+                                               return null;
+                                       }
+                                       
+                               }
+                       };
+                       
+                       @SuppressWarnings("unchecked")
+                       Sorter<Tuple2<Long, SomeMaybeLongValue>> sorter = new 
UnilateralSortMerger<Tuple2<Long, SomeMaybeLongValue>>(
+                                       this.memoryManager, this.ioManager, 
+                                       source, this.parentTask,
+                                       new 
RuntimeStatefulSerializerFactory<Tuple2<Long, SomeMaybeLongValue>>(serializer, 
(Class<Tuple2<Long, SomeMaybeLongValue>>) (Class<?>) Tuple2.class),
+                                       comparator, 1.0, 1, 128, 0.7f);
+                       
+                       // check order
+                       MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> 
iterator = sorter.getIterator();
+                       
+                       Tuple2<Long, SomeMaybeLongValue> val = 
serializer.createInstance();
+                       
+                       long prevKey = Long.MAX_VALUE;
+
+                       for (int i = 0; i < NUM_RECORDS; i++) {
+                               val = iterator.next(val);
+                               
+                               assertTrue(val.f0 <= prevKey);
+                               assertTrue(val.f0.intValue() == val.f1.val());
+                       }
+                       
+                       assertNull(iterator.next(val));
+                       
+                       sorter.close();
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testSortWithLongAndShortRecordsMixed() {
+               try {
+                       final int NUM_RECORDS = 1000000;
+                       final int LARGE_REC_INTERVAL = 100000;
+                       
+                       final TypeInformation<?>[] types = new 
TypeInformation<?>[] {
+                                       BasicTypeInfo.LONG_TYPE_INFO,
+                                       new 
ValueTypeInfo<SomeMaybeLongValue>(SomeMaybeLongValue.class)
+                               };
+                       
+                       final TupleTypeInfo<Tuple2<Long, SomeMaybeLongValue>> 
typeInfo = 
+                                                               new 
TupleTypeInfo<Tuple2<Long,SomeMaybeLongValue>>(types);
+                       final TypeSerializer<Tuple2<Long, SomeMaybeLongValue>> 
serializer = typeInfo.createSerializer();
+                       final TypeComparator<Tuple2<Long, SomeMaybeLongValue>> 
comparator = typeInfo.createComparator(new int[] {0}, new boolean[]{false}, 0);
+                       
+                       MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> 
source = 
+                                       new MutableObjectIterator<Tuple2<Long, 
SomeMaybeLongValue>>()
+                       {
+                               private final Random rnd = new Random();
+                               private int num = -1;
+                               
+                               @Override
+                               public Tuple2<Long, SomeMaybeLongValue> 
next(Tuple2<Long, SomeMaybeLongValue> reuse) {
+                                       if (++num < NUM_RECORDS) {
+                                               long val = rnd.nextLong();
+                                               return new Tuple2<Long, 
SomeMaybeLongValue>(val, new SomeMaybeLongValue((int) val, num % 
LARGE_REC_INTERVAL == 0));
+                                       }
+                                       else {
+                                               return null;
+                                       }
+                                       
+                               }
+                       };
+                       
+                       @SuppressWarnings("unchecked")
+                       Sorter<Tuple2<Long, SomeMaybeLongValue>> sorter = new 
UnilateralSortMerger<Tuple2<Long, SomeMaybeLongValue>>(
+                                       this.memoryManager, this.ioManager, 
+                                       source, this.parentTask,
+                                       new 
RuntimeStatefulSerializerFactory<Tuple2<Long, SomeMaybeLongValue>>(serializer, 
(Class<Tuple2<Long, SomeMaybeLongValue>>) (Class<?>) Tuple2.class),
+                                       comparator, 1.0, 1, 128, 0.7f);
+                       
+                       // check order
+                       MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> 
iterator = sorter.getIterator();
+                       
+                       Tuple2<Long, SomeMaybeLongValue> val = 
serializer.createInstance();
+                       
+                       long prevKey = Long.MAX_VALUE;
+
+                       for (int i = 0; i < NUM_RECORDS; i++) {
+                               val = iterator.next(val);
+                               
+                               assertTrue(val.f0 <= prevKey);
+                               assertTrue(val.f0.intValue() == val.f1.val());
+                       }
+                       
+                       assertNull(iterator.next(val));
+                       
+                       sorter.close();
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testSortWithShortMediumAndLargeRecords() {
+               try {
+                       final int NUM_RECORDS = 50000;
+                       final int LARGE_REC_INTERVAL = 10000;
+                       final int MEDIUM_REC_INTERVAL = 500;
+                       
+                       final TypeInformation<?>[] types = new 
TypeInformation<?>[] {
+                                       BasicTypeInfo.LONG_TYPE_INFO,
+                                       new 
ValueTypeInfo<SmallOrMediumOrLargeValue>(SmallOrMediumOrLargeValue.class)
+                               };
+                       
+                       final TupleTypeInfo<Tuple2<Long, 
SmallOrMediumOrLargeValue>> typeInfo = 
+                                                               new 
TupleTypeInfo<Tuple2<Long,SmallOrMediumOrLargeValue>>(types);
+                       
+                       final TypeSerializer<Tuple2<Long, 
SmallOrMediumOrLargeValue>> serializer = typeInfo.createSerializer();
+                       final TypeComparator<Tuple2<Long, 
SmallOrMediumOrLargeValue>> comparator = typeInfo.createComparator(new int[] 
{0}, new boolean[]{false}, 0);
+                       
+                       MutableObjectIterator<Tuple2<Long, 
SmallOrMediumOrLargeValue>> source = 
+                                       new MutableObjectIterator<Tuple2<Long, 
SmallOrMediumOrLargeValue>>()
+                       {
+                               private final Random rnd = new Random();
+                               private int num = -1;
+                               
+                               @Override
+                               public Tuple2<Long, SmallOrMediumOrLargeValue> 
next(Tuple2<Long, SmallOrMediumOrLargeValue> reuse) {
+                                       if (++num < NUM_RECORDS) {
+                                               
+                                               int size;
+                                               if (num % LARGE_REC_INTERVAL == 
0) {
+                                                       size = 
SmallOrMediumOrLargeValue.LARGE_SIZE;
+                                               } else if (num % 
MEDIUM_REC_INTERVAL == 0) {
+                                                       size = 
SmallOrMediumOrLargeValue.MEDIUM_SIZE;
+                                               } else {
+                                                       size = 
SmallOrMediumOrLargeValue.SMALL_SIZE;
+                                               }
+                                               
+                                               long val = rnd.nextLong();
+                                               return new Tuple2<Long, 
SmallOrMediumOrLargeValue>(val, new SmallOrMediumOrLargeValue((int) val, size));
+                                       }
+                                       else {
+                                               return null;
+                                       }
+                                       
+                               }
+                       };
+                       
+                       @SuppressWarnings("unchecked")
+                       Sorter<Tuple2<Long, SmallOrMediumOrLargeValue>> sorter 
= new UnilateralSortMerger<Tuple2<Long, SmallOrMediumOrLargeValue>>(
+                                       this.memoryManager, this.ioManager, 
+                                       source, this.parentTask,
+                                       new 
RuntimeStatefulSerializerFactory<Tuple2<Long, 
SmallOrMediumOrLargeValue>>(serializer, (Class<Tuple2<Long, 
SmallOrMediumOrLargeValue>>) (Class<?>) Tuple2.class),
+                                       comparator, 1.0, 1, 128, 0.7f);
+                       
+                       // check order
+                       MutableObjectIterator<Tuple2<Long, 
SmallOrMediumOrLargeValue>> iterator = sorter.getIterator();
+                       
+                       Tuple2<Long, SmallOrMediumOrLargeValue> val = 
serializer.createInstance();
+                       
+                       long prevKey = Long.MAX_VALUE;
+
+                       for (int i = 0; i < NUM_RECORDS; i++) {
+                               val = iterator.next(val);
+                               
+                               assertTrue(val.f0 <= prevKey);
+                               assertTrue(val.f0.intValue() == val.f1.val());
+                       }
+                       
+                       assertNull(iterator.next(val));
+                       
+                       sorter.close();
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       public static final class SomeMaybeLongValue implements 
org.apache.flink.types.Value {
+               
+               private static final long serialVersionUID = 1L;
+
+               private static final byte[] BUFFER = new byte[100 * 1024 * 
1024];
+               
+               static {
+                       for (int i = 0; i < BUFFER.length; i++) {
+                               BUFFER[i] = (byte) i;
+                       }
+               }
+               
+               private int val;
+               
+               private boolean isLong;
+               
+
+               public SomeMaybeLongValue() {
+                       this.isLong = true;
+               }
+               
+               public SomeMaybeLongValue(int val) {
+                       this.val = val;
+                       this.isLong = true;
+               }
+               
+               public SomeMaybeLongValue(int val, boolean isLong) {
+                       this.val = val;
+                       this.isLong = isLong;
+               }
+               
+               public int val() {
+                       return val;
+               }
+               
+               public boolean isLong() {
+                       return isLong;
+               }
+               
+               @Override
+               public void read(DataInputView in) throws IOException {
+                       val = in.readInt();
+                       isLong = in.readBoolean();
+                       
+                       if (isLong) {
+                               for (int i = 0; i < BUFFER.length; i++) {
+                                       byte b = in.readByte();
+                                       assertEquals(BUFFER[i], b);
+                               }
+                       }
+               }
+               
+               @Override
+               public void write(DataOutputView out) throws IOException {
+                       out.writeInt(val);
+                       out.writeBoolean(isLong);
+                       if (isLong) {
+                               out.write(BUFFER);
+                       }
+               }
+               
+               @Override
+               public int hashCode() {
+                       return val;
+               }
+               
+               @Override
+               public boolean equals(Object obj) {
+                       return (obj instanceof SomeMaybeLongValue) && 
((SomeMaybeLongValue) obj).val == this.val;
+               }
+               
+               @Override
+               public String toString() {
+                       return isLong ? "Large Value" : "Small Value";
+               }
+       }
+       
+       public static final class SmallOrMediumOrLargeValue implements 
org.apache.flink.types.Value {
+               
+               private static final long serialVersionUID = 1L;
+               
+               public static final int TYPE_SMALL = 0;
+               public static final int TYPE_MEDIUM = 1;
+               public static final int TYPE_LARGE = 2;
+               
+               public static final int SMALL_SIZE = 0;
+               public static final int MEDIUM_SIZE = 12 * 1024 * 1024;
+               public static final int LARGE_SIZE = 100 * 1024 * 1024;
+               
+               private int val;
+               
+               private int size;
+               
+
+               public SmallOrMediumOrLargeValue() {
+                       this.size = SMALL_SIZE;
+               }
+               
+               public SmallOrMediumOrLargeValue(int val) {
+                       this.val = val;
+                       this.size = SMALL_SIZE;
+               }
+               
+               public SmallOrMediumOrLargeValue(int val, int size) {
+                       this.val = val;
+                       this.size = size;
+               }
+               
+               public int val() {
+                       return val;
+               }
+               
+               public int getSize() {
+                       return size;
+               }
+               
+               @Override
+               public void read(DataInputView in) throws IOException {
+                       val = in.readInt();
+                       size = in.readInt();
+                       
+                       for (int i = 0; i < size; i++) {
+                               byte b = in.readByte();
+                               assertEquals((byte) i, b);
+                       }
+               }
+               
+               @Override
+               public void write(DataOutputView out) throws IOException {
+                       out.writeInt(val);
+                       out.writeInt(size);
+                       
+                       for (int i = 0; i < size; i++) {
+                               out.write((byte) (i));
+                       }
+               }
+               
+               @Override
+               public int hashCode() {
+                       return val;
+               }
+               
+               @Override
+               public boolean equals(Object obj) {
+                       if (obj instanceof SmallOrMediumOrLargeValue) {
+                               SmallOrMediumOrLargeValue other = 
(SmallOrMediumOrLargeValue) obj;
+                               return other.val == this.val && other.size == 
this.size;
+                       } else {
+                               return false;
+                       }
+               }
+               
+               @Override
+               public String toString() {
+                       return String.format("Value %d (%d bytes)", val, size);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/28a62d84/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
index 55d01d2..a711d80 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators.sort;
 
 import java.io.BufferedReader;
@@ -48,9 +47,6 @@ public class MassiveStringSortingITCase {
 
        private static final long SEED = 347569784659278346L;
        
-       @SuppressWarnings("unused")
-       private static final char LINE_BREAK = '\n';
-       
        
        public void testStringSorting() {
                File input = null;

Reply via email to