[FLINK-1296] [runtime] Add sorter support for very large records

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/48256560
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/48256560
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/48256560

Branch: refs/heads/master
Commit: 482565608414fea8df7ed86d5d9a8d90fdfcd4f7
Parents: 996d404
Author: Stephan Ewen <[email protected]>
Authored: Tue Dec 2 22:20:26 2014 +0100
Committer: Ufuk Celebi <[email protected]>
Committed: Wed Jan 21 12:01:35 2015 +0100

----------------------------------------------------------------------
 .../org/apache/flink/api/java/tuple/Tuple.java  |  28 +
 .../flink/api/java/tuple/TupleGenerator.java    |  12 +-
 .../flink/api/java/typeutils/TupleTypeInfo.java |  13 +-
 .../RuntimeStatefulSerializerFactory.java       |   3 +
 .../typeutils/runtime/TupleSerializerBase.java  |   3 +
 flink-runtime/pom.xml                           |  14 +-
 .../runtime/io/disk/FileChannelOutputView.java  |   4 +
 .../io/disk/SeekableFileChannelInputView.java   |   2 +-
 .../runtime/io/disk/iomanager/IOManager.java    |  32 +-
 .../sort/CombiningUnilateralSortMerger.java     |  14 +-
 .../operators/sort/LargeRecordHandler.java      | 451 ++++++++++++++++
 .../runtime/operators/sort/SortBuffer.java      | 510 +++++++++++++++++++
 .../operators/sort/UnilateralSortMerger.java    | 433 +++++++++-------
 .../operators/sort/ExternalSortITCase.java      | 227 ++++++++-
 .../sort/LargeRecordHandlerITCase.java          | 277 ++++++++++
 .../operators/sort/LargeRecordHandlerTest.java  | 271 ++++++++++
 16 files changed, 2062 insertions(+), 232 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/48256560/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java 
b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java
index 5e2b8a2..145d215 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java
@@ -81,4 +81,32 @@ public abstract class Tuple implements java.io.Serializable {
         * @return The number of fields in the tuple.
         */
        public abstract int getArity();
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       /**
+        * Gets the class corresponding to the tuple of the given arity 
(dimensions). For
+        * example, {@code getTupleClass(3)} will return the {@code 
Tuple3.class}.
+        * 
+        * @param arity The arity of the tuple class to get.
+        * @return The tuple class with the given arity.
+        */
+       @SuppressWarnings("unchecked")
+       public static Class<? extends Tuple> getTupleClass(int arity) {
+               if (arity < 1 || arity > MAX_ARITY) {
+                       throw new IllegalArgumentException("The tuple arity 
must be in [0, " + MAX_ARITY + "].");
+               }
+               return (Class<? extends Tuple>) CLASSES[arity - 1];
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
 
+       // The following lines are generated.
+       // 
--------------------------------------------------------------------------------------------
+       
+       // BEGIN_OF_TUPLE_DEPENDENT_CODE        
+       // GENERATED FROM org.apache.flink.api.java.tuple.TupleGenerator.
+       private static final Class<?>[] CLASSES = new Class<?>[] {
+               Tuple1.class, Tuple2.class, Tuple3.class, Tuple4.class, 
Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class, Tuple9.class, 
Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class, Tuple14.class, 
Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class, Tuple19.class, 
Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class, Tuple24.class, 
Tuple25.class
+       };
+       // END_OF_TUPLE_DEPENDENT_CODE
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/48256560/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java 
b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
index 3bfa94b..ed429e3 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
@@ -53,9 +53,9 @@ class TupleGenerator {
        private static final String CSV_READER_CLASSNAME = "CsvReader";
 
        // Parameters for TupleTypeInfo
-       private static final String TUPLE_TYPE_INFO_PACKAGE = 
"org.apache.flink.api.java.typeutils";
+       private static final String TUPLE_PACKAGE = 
"org.apache.flink.api.java.tuple";
 
-       private static final String TUPLE_TYPE_INFO_CLASSNAME = "TupleTypeInfo";
+       private static final String TUPLE_CLASSNAME = "Tuple";
 
        // Parameters for ProjectOperator
        private static final String PROJECT_OPERATOR_PACKAGE = 
"org.apache.flink.api.java.operators";
@@ -92,7 +92,7 @@ class TupleGenerator {
 
                modifyCsvReader(root);
 
-               modifyTupleTypeInfo(root);
+               modifyTupleType(root);
 
                modifyProjectOperator(root);
 
@@ -395,7 +395,7 @@ class TupleGenerator {
                insertCodeIntoFile(sb.toString(), projectOperatorClass);
        }
 
-       private static void modifyTupleTypeInfo(File root) throws IOException {
+       private static void modifyTupleType(File root) throws IOException {
                // generate code
                StringBuilder sb = new StringBuilder();
                sb.append("\tprivate static final Class<?>[] CLASSES = new 
Class<?>[] {\n\t\t");
@@ -408,8 +408,8 @@ class TupleGenerator {
                sb.append("\n\t};");
 
                // insert code into file
-               File dir = getPackage(root, TUPLE_TYPE_INFO_PACKAGE);
-               File tupleTypeInfoClass = new File(dir, 
TUPLE_TYPE_INFO_CLASSNAME + ".java");
+               File dir = getPackage(root, TUPLE_PACKAGE);
+               File tupleTypeInfoClass = new File(dir, TUPLE_CLASSNAME + 
".java");
                insertCodeIntoFile(sb.toString(), tupleTypeInfoClass);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/48256560/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
index 177f033..af258e2 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
@@ -37,7 +37,7 @@ public final class TupleTypeInfo<T extends Tuple> extends 
TupleTypeInfoBase<T> {
        
        @SuppressWarnings("unchecked")
        public TupleTypeInfo(TypeInformation<?>... types) {
-               this((Class<T>) CLASSES[types.length - 1], types);
+               this((Class<T>) Tuple.getTupleClass(types.length), types);
        }
 
        public TupleTypeInfo(Class<T> tupleType, TypeInformation<?>... types) {
@@ -152,15 +152,4 @@ public final class TupleTypeInfo<T extends Tuple> extends 
TupleTypeInfoBase<T> {
                TupleTypeInfo<X> tupleInfo = (TupleTypeInfo<X>) new 
TupleTypeInfo<Tuple>(infos);
                return tupleInfo;
        }
-
-       // 
--------------------------------------------------------------------------------------------
 
-       // The following lines are generated.
-       // 
--------------------------------------------------------------------------------------------
-       
-       // BEGIN_OF_TUPLE_DEPENDENT_CODE        
-       // GENERATED FROM org.apache.flink.api.java.tuple.TupleGenerator.
-       private static final Class<?>[] CLASSES = new Class<?>[] {
-               Tuple1.class, Tuple2.class, Tuple3.class, Tuple4.class, 
Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class, Tuple9.class, 
Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class, Tuple14.class, 
Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class, Tuple19.class, 
Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class, Tuple24.class, 
Tuple25.class
-       };
-       // END_OF_TUPLE_DEPENDENT_CODE
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/48256560/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeStatefulSerializerFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeStatefulSerializerFactory.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeStatefulSerializerFactory.java
index 19cd3b7..dcc31bf 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeStatefulSerializerFactory.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeStatefulSerializerFactory.java
@@ -55,6 +55,9 @@ public final class RuntimeStatefulSerializerFactory<T> 
implements TypeSerializer
                }
        }
 
+       public void setClassLoader(ClassLoader loader) {
+               this.loader = loader;
+       }
 
        @Override
        public void writeParametersToConfig(Configuration config) {

http://git-wip-us.apache.org/repos/asf/flink/blob/48256560/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
index 08df7d3..a30eda3 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
@@ -55,6 +55,9 @@ public abstract class TupleSerializerBase<T> extends 
TypeSerializer<T> {
                this.stateful = stateful;
        }
        
+       public Class<T> getTupleClass() {
+               return this.tupleClass;
+       }
        
        @Override
        public boolean isImmutableType() {

http://git-wip-us.apache.org/repos/asf/flink/blob/48256560/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 692e6e1..5ac12d5 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -40,6 +40,12 @@ under the License.
                        <artifactId>flink-core</artifactId>
                        <version>${project.version}</version>
                </dependency>
+               
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-java</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
 
                <dependency>
                        <groupId>commons-cli</groupId>
@@ -70,7 +76,6 @@ under the License.
                        <version>1.8.1</version>
                </dependency>
 
-
                <dependency>
                        <groupId>io.netty</groupId>
                        <artifactId>netty-all</artifactId>
@@ -92,13 +97,6 @@ under the License.
                </dependency>
 
                <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-java</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
                        <groupId>org.scala-lang</groupId>
                        <artifactId>scala-library</artifactId>
                </dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/48256560/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelOutputView.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelOutputView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelOutputView.java
index 2b8b728..e04759c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelOutputView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelOutputView.java
@@ -128,6 +128,10 @@ public class FileChannelOutputView extends 
AbstractPagedOutputView {
                return bytesInLatestSegment;
        }
        
+       public long getWriteOffset() {
+               return ((long) numBlocksWritten) * segmentSize + 
getCurrentPositionInSegment();
+       }
+       
        @Override
        protected MemorySegment nextSegment(MemorySegment current, int 
posInSegment) throws IOException {
                if (current != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/48256560/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java
index e97a1ff..6098fdb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java
@@ -116,7 +116,7 @@ public class SeekableFileChannelInputView extends 
AbstractPagedInputView {
                reader = ioManager.createBlockChannelReader(channelId);
                
                if (block > 0) {
-                       reader.seekToPosition(block * segmentSize);
+                       reader.seekToPosition(((long) block) * segmentSize);
                }
                
                this.numBlocksRemaining = this.numBlocksTotal - block;

http://git-wip-us.apache.org/repos/asf/flink/blob/48256560/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
index ff2fb85..e58c4d8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
@@ -97,6 +97,19 @@ public abstract class IOManager {
                return new FileIOChannel.Enumerator(this.paths, this.random);
        }
 
+       /**
+        * Deletes the file underlying the given channel. If the channel is 
still open, this
+        * call may fail.
+        * 
+        * @param channel The channel to be deleted.
+        * @throws IOException Thrown if the deletion fails.
+        */
+       public void deleteChannel(FileIOChannel.ID channel) throws IOException {
+               if (channel != null) {
+                       new File(channel.getPath()).delete();
+               }
+       }
+       
        // 
------------------------------------------------------------------------
        //                        Reader / Writer instantiations
        // 
------------------------------------------------------------------------
@@ -179,11 +192,20 @@ public abstract class IOManager {
         */
        public abstract BulkBlockChannelReader 
createBulkBlockChannelReader(FileIOChannel.ID channelID,
                        List<MemorySegment> targetSegments, int numBlocks) 
throws IOException;
-
-       // 
========================================================================
-       //                             Utilities
-       // 
========================================================================
-
+       
+       // 
------------------------------------------------------------------------
+       //                          Utilities
+       // 
------------------------------------------------------------------------
+       
+       /**
+        * Gets the number of directories across which the I/O manager rotates 
its files.
+        * 
+        * @return The number of temporary file directories.
+        */
+       public int getNumberOfTempDirs() {
+               return this.paths.length;
+       }
+       
        protected int getNextPathNum() {
                final int next = this.nextPath;
                final int newNext = next + 1;

http://git-wip-us.apache.org/repos/asf/flink/blob/48256560/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
index 5d4c881..35297ca 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
@@ -43,7 +43,7 @@ import 
org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
 import org.apache.flink.runtime.util.EmptyMutableObjectIterator;
-import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
+import org.apache.flink.runtime.util.KeyGroupedIterator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 import org.apache.flink.util.TraversableOnceException;
@@ -141,7 +141,7 @@ public class CombiningUnilateralSortMerger<E> extends 
UnilateralSortMerger<E> {
        throws IOException, MemoryAllocationException
        {
                super(memoryManager, ioManager, input, parentTask, 
serializerFactory, comparator,
-                       memoryFraction, numSortBuffers, maxNumFileHandles, 
startSpillingFraction, false);
+                       memoryFraction, numSortBuffers, maxNumFileHandles, 
startSpillingFraction, false, true);
                
                this.combineStub = combineStub;
        }
@@ -243,7 +243,7 @@ public class CombiningUnilateralSortMerger<E> extends 
UnilateralSortMerger<E> {
                                }
                                disposeSortBuffers(true);
                                
-                               // set result iterator
+                               // set lazy iterator
                                MutableObjectIterator<E> resIter = 
iterators.isEmpty() ? EmptyMutableObjectIterator.<E>get() :
                                                iterators.size() == 1 ? 
iterators.get(0) : 
                                                new MergeIterator<E>(iterators, 
this.comparator);
@@ -392,8 +392,8 @@ public class CombiningUnilateralSortMerger<E> extends 
UnilateralSortMerger<E> {
                        // ------------------- Merging Phase 
------------------------
 
                        // merge channels until sufficient file handles are 
available
-                       while (isRunning() && channelIDs.size() > 
this.maxNumFileHandles) {
-                               channelIDs = mergeChannelList(channelIDs, 
this.sortReadMemory, this.writeMemory);
+                       while (isRunning() && channelIDs.size() > 
this.maxFanIn) {
+                               channelIDs = mergeChannelList(channelIDs, 
this.mergeReadMemory, this.writeMemory);
                        }
                        
                        // from here on, we won't write again
@@ -413,7 +413,7 @@ public class CombiningUnilateralSortMerger<E> extends 
UnilateralSortMerger<E> {
                                List<List<MemorySegment>> readBuffers = new 
ArrayList<List<MemorySegment>>(channelIDs.size());
                                
                                // allocate the read memory and register it to 
be released
-                               getSegmentsForReaders(readBuffers, 
this.sortReadMemory, channelIDs.size());
+                               getSegmentsForReaders(readBuffers, 
this.mergeReadMemory, channelIDs.size());
                                
                                // get the readers and register them to be 
released
                                final MergeIterator<E> mergeIterator = 
getMergingIterator(
@@ -452,7 +452,7 @@ public class CombiningUnilateralSortMerger<E> extends 
UnilateralSortMerger<E> {
                        final List<FileIOChannel> channelAccesses = new 
ArrayList<FileIOChannel>(channelIDs.size());
 
                        // the list with the target iterators
-                       final MergeIterator<E> mergeIterator = 
getMergingIterator(channelIDs, readBuffers, channelAccesses);
+                       final MergeIterator<E> mergeIterator = 
getMergingIterator(channelIDs, readBuffers, channelAccesses, null);
                        final ReusingKeyGroupedIterator<E> groupedIter = new 
ReusingKeyGroupedIterator<E>(mergeIterator, this.serializer, this.comparator2);
 
                        // create a new channel writer

http://git-wip-us.apache.org/repos/asf/flink/blob/48256560/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java
new file mode 100644
index 0000000..83e003f
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import 
org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory;
+import 
org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory;
+import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.FileChannelInputView;
+import org.apache.flink.runtime.io.disk.FileChannelOutputView;
+import org.apache.flink.runtime.io.disk.InputViewIterator;
+import org.apache.flink.runtime.io.disk.SeekableFileChannelInputView;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.types.NullKeyFieldException;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class LargeRecordHandler<T> {
+       
+       private static final Logger LOG = 
LoggerFactory.getLogger(LargeRecordHandler.class);
+       
+       private static final int MIN_SEGMENTS_FOR_KEY_SPILLING = 1;
+       
+       private static final int MAX_SEGMENTS_FOR_KEY_SPILLING = 4;
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       private final TypeSerializer<T> serializer;
+       
+       private final TypeComparator<T> comparator;
+       
+       private TupleSerializer<Tuple> keySerializer;
+       
+       private TupleComparator<Tuple> keyComparator;
+       
+       private FileChannelOutputView recordsOutFile;
+       
+       private FileChannelOutputView keysOutFile;
+       
+       private Tuple keyTuple;
+       
+       private FileChannelInputView keysReader;
+       
+       private SeekableFileChannelInputView recordsReader;
+       
+       private FileIOChannel.ID recordsChannel;
+       
+       private FileIOChannel.ID keysChannel;
+       
+       private final IOManager ioManager;
+       
+       private final MemoryManager memManager;
+       
+       private final List<MemorySegment> memory;
+       
+       private TypeSerializerFactory<Tuple> keySerializerFactory;
+       
+       private UnilateralSortMerger<Tuple> keySorter;
+       
+       private final AbstractInvokable memoryOwner;
+       
+       private long recordCounter;
+       
+       private int numKeyFields;
+       
+       private final int maxFilehandles;
+       
+       private volatile boolean closed;
+
+       // 
--------------------------------------------------------------------------------------------
+       
+       public LargeRecordHandler(TypeSerializer<T> serializer, 
TypeComparator<T> comparator, 
+                       IOManager ioManager, MemoryManager memManager, 
List<MemorySegment> memory,
+                       AbstractInvokable memoryOwner, int maxFilehandles)
+       {
+               this.serializer = checkNotNull(serializer);
+               this.comparator = checkNotNull(comparator);
+               this.ioManager = checkNotNull(ioManager);
+               this.memManager = checkNotNull(memManager);
+               this.memory = checkNotNull(memory);
+               this.memoryOwner = checkNotNull(memoryOwner);
+               this.maxFilehandles = maxFilehandles;
+               
+               checkArgument(maxFilehandles >= 2);
+       }
+       
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       @SuppressWarnings("unchecked")
+       public long addRecord(T record) throws IOException {
+
+               if (recordsOutFile == null) {
+                       
+                       if (closed) {
+                               throw new IllegalStateException("The large 
record handler has been closed.");
+                       }
+                       if (recordsReader != null) {
+                               throw new IllegalStateException("The handler 
has already switched to sorting.");
+                       }
+                       
+                       LOG.debug("Initializing the large record spilling...");
+                       
+                       // initialize the utilities
+                       {
+                               final TypeComparator<?>[] keyComps = 
comparator.getFlatComparators();
+                               numKeyFields = keyComps.length;
+                               Object[] keyHolder = new Object[numKeyFields];
+                               
+                               comparator.extractKeys(record, keyHolder, 0);
+                               
+                               TypeSerializer<?>[] keySers = new 
TypeSerializer<?>[numKeyFields];
+                               TypeSerializer<?>[] tupleSers = new 
TypeSerializer<?>[numKeyFields + 1];
+                               
+                               int[] keyPos = new int[numKeyFields];
+                               
+                               for (int i = 0; i < numKeyFields; i++) {
+                                       keyPos[i] = i;
+                                       keySers[i] = 
createSerializer(keyHolder[i], i);
+                                       tupleSers[i] = keySers[i];
+                               }
+                               // add the long serializer for the offset
+                               tupleSers[numKeyFields] = 
LongSerializer.INSTANCE;
+                               
+                               keySerializer = new 
TupleSerializer<Tuple>((Class<Tuple>) Tuple.getTupleClass(numKeyFields+1), 
tupleSers);
+                               keyComparator = new 
TupleComparator<Tuple>(keyPos, keyComps, keySers);
+                               
+                               // create the serializer factory for the tuple 
serializer
+                               if (keySerializer.isStateful()) {
+                                       ClassLoader cl = 
getClassLoader(tupleSers);
+                                       
+                                       RuntimeStatefulSerializerFactory<Tuple> 
factory = 
+                                                       new 
RuntimeStatefulSerializerFactory<Tuple>(keySerializer, 
keySerializer.getTupleClass());
+                                       factory.setClassLoader(cl);
+                                       keySerializerFactory = factory;
+                               }
+                               else {
+                                       keySerializerFactory = new 
RuntimeStatelessSerializerFactory<Tuple>(keySerializer, 
keySerializer.getTupleClass());
+                               }
+                               
+                               keyTuple = keySerializer.createInstance();
+                       }
+                       
+                       // initialize the spilling
+                       final int totalNumSegments = memory.size();
+                       final int segmentsForKeys = (totalNumSegments >= 
2*MAX_SEGMENTS_FOR_KEY_SPILLING) ? MAX_SEGMENTS_FOR_KEY_SPILLING : 
+                               Math.max(MIN_SEGMENTS_FOR_KEY_SPILLING, 
totalNumSegments - MAX_SEGMENTS_FOR_KEY_SPILLING);
+                               
+                       List<MemorySegment> recordsMemory = new 
ArrayList<MemorySegment>();
+                       List<MemorySegment> keysMemory = new 
ArrayList<MemorySegment>();
+                       
+                       for (int i = 0; i < segmentsForKeys; i++) {
+                               keysMemory.add(memory.get(i));
+                       }
+                       for (int i = segmentsForKeys; i < totalNumSegments; 
i++) {
+                               recordsMemory.add(memory.get(i));
+                       }
+                       
+                       recordsChannel = ioManager.createChannel();
+                       keysChannel = ioManager.createChannel();
+                       
+                       recordsOutFile = new FileChannelOutputView(
+                                       
ioManager.createBlockChannelWriter(recordsChannel), memManager,
+                                       recordsMemory, 
memManager.getPageSize());
+                       
+                       keysOutFile = new FileChannelOutputView(
+                                       
ioManager.createBlockChannelWriter(keysChannel), memManager,
+                                       keysMemory, memManager.getPageSize());
+               }
+               
+               final long offset = recordsOutFile.getWriteOffset();
+               if (offset < 0) {
+                       throw new RuntimeException("wrong offset");
+               }
+               
+               Object[] keyHolder = new Object[numKeyFields];
+               
+               comparator.extractKeys(record, keyHolder, 0);
+               for (int i = 0; i < numKeyFields; i++) {
+                       keyTuple.setField(keyHolder[i], i);
+               }
+               keyTuple.setField(offset, numKeyFields);
+               
+               keySerializer.serialize(keyTuple, keysOutFile);
+               serializer.serialize(record, recordsOutFile);
+               
+               recordCounter++;
+               
+               return offset;
+       }
+       
+       public MutableObjectIterator<T> 
finishWriteAndSortKeys(List<MemorySegment> memory) throws IOException {
+               if (recordsOutFile == null || keysOutFile == null) {
+                       throw new IllegalStateException("The LargeRecordHandler 
has not spilled any records");
+               }
+               
+               // close the writers and 
+               final int lastBlockBytesKeys;
+               final int lastBlockBytesRecords;
+               
+               recordsOutFile.close();
+               keysOutFile.close();
+               lastBlockBytesKeys = keysOutFile.getBytesInLatestSegment();
+               lastBlockBytesRecords = 
recordsOutFile.getBytesInLatestSegment();
+               recordsOutFile = null;
+               keysOutFile = null;
+               
+               final int pagesForReaders = 
Math.max(3*MIN_SEGMENTS_FOR_KEY_SPILLING, 
Math.min(2*MAX_SEGMENTS_FOR_KEY_SPILLING, memory.size() / 50));
+               final int pagesForKeyReader = Math.min(pagesForReaders - 
MIN_SEGMENTS_FOR_KEY_SPILLING, MAX_SEGMENTS_FOR_KEY_SPILLING);
+               final int pagesForRecordReader = pagesForReaders - 
pagesForKeyReader;
+               
+               // grab memory for the record reader
+               ArrayList<MemorySegment> memForRecordReader = new 
ArrayList<MemorySegment>();
+               ArrayList<MemorySegment> memForKeysReader = new 
ArrayList<MemorySegment>();
+               
+               for (int i = 0; i < pagesForRecordReader; i++) {
+                       memForRecordReader.add(memory.remove(memory.size() - 
1));
+               }
+               for (int i = 0; i < pagesForKeyReader; i++) {
+                       memForKeysReader.add(memory.remove(memory.size() - 1));
+               }
+               
+               keysReader = new 
FileChannelInputView(ioManager.createBlockChannelReader(keysChannel),
+                               memManager, memForKeysReader, 
lastBlockBytesKeys);
+               InputViewIterator<Tuple> keyIterator = new 
InputViewIterator<Tuple>(keysReader, keySerializer);
+               
+               keySorter = new UnilateralSortMerger<Tuple>(memManager, memory, 
ioManager, 
+                               keyIterator, memoryOwner, keySerializerFactory, 
keyComparator, 1, maxFilehandles, 1.0f, false);
+
+               // wait for the sorter to sort the keys
+               MutableObjectIterator<Tuple> result;
+               try {
+                       result = keySorter.getIterator();
+               } catch (InterruptedException e) {
+                       throw new IOException(e);
+               }
+               
+               recordsReader = new SeekableFileChannelInputView(ioManager, 
recordsChannel, memManager, memForRecordReader, lastBlockBytesRecords);
+               
+               return new FetchingIterator<T>(serializer, result, 
recordsReader, keySerializer, numKeyFields);
+       }
+       
+       /**
+        * Closes all structures and deletes all temporary files.
+        * Even in the presence of failures, this method will try and continue 
closing
+        * files and deleting temporary files.
+        * 
+        * @throws IOException Thrown if an error occurred while 
closing/deleting the files.
+        */
+       public void close() throws IOException {
+               
+               // we go on closing and deleting files in the presence of 
failures.
+               // we remember the first exception to occur and re-throw it 
later
+               Throwable ex = null;
+               
+               synchronized (this) {
+                       
+                       if (closed) {
+                               return;
+                       }
+                       closed = true;
+                       
+                       // close the writers
+                       if (recordsOutFile != null) {
+                               try {
+                                       recordsOutFile.close();
+                                       recordsOutFile = null;
+                               } catch (Throwable t) {
+                                       LOG.error("Cannot close the large 
records spill file.", t);
+                                       ex = ex == null ? t : ex;
+                               }
+                       }
+                       if (keysOutFile != null) {
+                               try {
+                                       keysOutFile.close();
+                                       keysOutFile = null;
+                               } catch (Throwable t) {
+                                       LOG.error("Cannot close the large 
records key spill file.", t);
+                                       ex = ex == null ? t : ex;
+                               }
+                       }
+                       
+                       // close the readers
+                       if (recordsReader != null) {
+                               try {
+                                       recordsReader.close();
+                                       recordsReader = null;
+                               } catch (Throwable t) {
+                                       LOG.error("Cannot close the large 
records reader.", t);
+                                       ex = ex == null ? t : ex;
+                               }
+                       }
+                       if (keysReader != null) {
+                               try {
+                                       keysReader.close();
+                                       keysReader = null;
+                               } catch (Throwable t) {
+                                       LOG.error("Cannot close the large 
records key reader.", t);
+                                       ex = ex == null ? t : ex;
+                               }
+                       }
+                       
+                       // delete the spill files
+                       if (recordsChannel != null) {
+                               try {
+                                       ioManager.deleteChannel(recordsChannel);
+                                       recordsChannel = null;
+                               } catch (Throwable t) {
+                                       LOG.error("Cannot delete the large 
records spill file.", t);
+                                       ex = ex == null ? t : ex;
+                               }
+                       }
+                       if (keysChannel != null) {
+                               try {
+                                       ioManager.deleteChannel(keysChannel);
+                                       keysChannel = null;
+                               } catch (Throwable t) {
+                                       LOG.error("Cannot delete the large 
records key spill file.", t);
+                                       ex = ex == null ? t : ex;
+                               }
+                       }
+                       
+                       // close the key sorter
+                       if (keySorter != null) {
+                               try {
+                                       keySorter.close();
+                                       keySorter = null;
+                               } catch (Throwable t) {
+                                       LOG.error("Cannot properly dispose the 
key sorter and clean up its temporary files.", t);
+                                       ex = ex == null ? t : ex;
+                               }
+                       }
+                       
+                       memManager.release(memory);
+                       
+                       recordCounter = 0;
+               }
+               
+               // re-throw the exception, if necessary
+               if (ex != null) { 
+                       throw new IOException("An error occurred cleaning up 
spill files in the large record handler.", ex);
+               }
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       public boolean hasData() {
+               return recordCounter > 0;
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       private static TypeSerializer<Object> createSerializer(Object key, int 
pos) {
+               if (key == null) {
+                       throw new NullKeyFieldException(pos);
+               }
+               try {
+                       TypeInformation<Object> info = 
TypeExtractor.getForObject(key);
+                       return info.createSerializer();
+               }
+               catch (Throwable t) {
+                       throw new RuntimeException("Could not create key 
serializer for type " + key);
+               }
+       }
+       
+       private static ClassLoader getClassLoader(Object[] objects) {
+               final ClassLoader appCl = 
LargeRecordHandler.class.getClassLoader();
+               
+               for (Object o : objects) {
+                       if (o != null && o.getClass().getClassLoader() != 
appCl) {
+                               return o.getClass().getClassLoader();
+                       }
+               }
+               
+               return appCl;
+       }
+       
+       private static final class FetchingIterator<T> implements 
MutableObjectIterator<T> {
+               
+               private final TypeSerializer<T> serializer;
+               
+               private final MutableObjectIterator<Tuple> tupleInput;
+               
+               private final SeekableFileChannelInputView recordsInputs;
+               
+               private Tuple value;
+               
+               private final int pointerPos;
+               
+               
+               public FetchingIterator(TypeSerializer<T> serializer, 
MutableObjectIterator<Tuple> tupleInput,
+                               SeekableFileChannelInputView recordsInputs, 
TypeSerializer<Tuple> tupleSerializer, int pointerPos) {
+                       this.serializer = serializer;
+                       this.tupleInput = tupleInput;
+                       this.recordsInputs = recordsInputs;
+                       this.pointerPos = pointerPos;
+                       
+                       this.value = tupleSerializer.createInstance();
+               }
+
+               @Override
+               public T next(T reuse) throws IOException {
+                       Tuple value = tupleInput.next(this.value);
+                       if (value != null) {
+                               this.value = value;
+                               long pointer = value.<Long>getField(pointerPos);
+                               
+                               recordsInputs.seek(pointer);
+                               return serializer.deserialize(recordsInputs);
+                       } else {
+                               return null;
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/48256560/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/SortBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/SortBuffer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/SortBuffer.java
new file mode 100644
index 0000000..be8511b
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/SortBuffer.java
@@ -0,0 +1,510 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.io.disk.SimpleCollectingOutputView;
+import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
+import org.apache.flink.runtime.memorymanager.ListMemorySegmentSource;
+import org.apache.flink.util.MutableObjectIterator;
+
+public class SortBuffer<T> implements InMemorySorter<T> {
+
+       private static final int OFFSET_LEN = 8;
+       
+       private static final int DEFAULT_MAX_NORMALIZED_KEY_LEN = 16;
+       
+       private static final int MAX_NORMALIZED_KEY_LEN_PER_ELEMENT = 8;
+       
+       private static final int MIN_REQUIRED_BUFFERS = 3;
+
+       // 
------------------------------------------------------------------------
+       //                               Members
+       // 
------------------------------------------------------------------------
+
+       private final byte[] swapBuffer;
+       
+       private final TypeSerializer<T> serializer;
+       
+       private final TypeComparator<T> comparator;
+       
+       private final SimpleCollectingOutputView recordCollector;
+       
+       private final RandomAccessInputView recordBuffer;
+       
+       private final RandomAccessInputView recordBufferForComparison;
+       
+       private MemorySegment currentSortIndexSegment;
+       
+       private final ArrayList<MemorySegment> freeMemory;
+       
+       private final ArrayList<MemorySegment> sortIndex;
+       
+       private final ArrayList<MemorySegment> recordBufferSegments;
+       
+       private long currentDataBufferOffset;
+       
+       private long sortIndexBytes;
+       
+       private int currentSortIndexOffset;
+       
+       private int numRecords;
+       
+       private final int numKeyBytes;
+       
+       private final int indexEntrySize;
+       
+       private final int indexEntriesPerSegment;
+       
+       private final int lastIndexEntryOffset;
+       
+       private final int segmentSize;
+       
+       private final int totalNumBuffers;
+       
+       private final boolean normalizedKeyFullyDetermines;
+       
+       private final boolean useNormKeyUninverted;
+       
+       
+       // 
-------------------------------------------------------------------------
+       // Constructors / Destructors
+       // 
-------------------------------------------------------------------------
+
+       public SortBuffer(TypeSerializer<T> serializer, TypeComparator<T> 
comparator, List<MemorySegment> memory) {
+               this(serializer, comparator, memory, 
DEFAULT_MAX_NORMALIZED_KEY_LEN);
+       }
+       
+       public SortBuffer(TypeSerializer<T> serializer, TypeComparator<T> 
comparator, 
+                       List<MemorySegment> memory, int maxNormalizedKeyBytes)
+       {
+               if (serializer == null || comparator == null || memory == null) 
{
+                       throw new NullPointerException();
+               }
+               if (maxNormalizedKeyBytes < 0) {
+                       throw new IllegalArgumentException("Maximal number of 
normalized key bytes must not be negative.");
+               }
+               
+               this.serializer = serializer;
+               this.comparator = comparator;
+               this.useNormKeyUninverted = !comparator.invertNormalizedKey();
+               
+               // check the size of the first buffer and record it. all 
further buffers must have the same size.
+               // the size must also be a power of 2
+               this.totalNumBuffers = memory.size();
+               if (this.totalNumBuffers < MIN_REQUIRED_BUFFERS) {
+                       throw new IllegalArgumentException("Normalized-Key 
sorter requires at least " + MIN_REQUIRED_BUFFERS + " memory buffers.");
+               }
+               this.segmentSize = memory.get(0).size();
+               
+               if (memory instanceof ArrayList<?>) {
+                       this.freeMemory = (ArrayList<MemorySegment>) memory;
+               }
+               else {
+                       this.freeMemory = new 
ArrayList<MemorySegment>(memory.size());
+                       this.freeMemory.addAll(memory);
+               }
+               
+               // create the buffer collections
+               this.sortIndex = new ArrayList<MemorySegment>(16);
+               this.recordBufferSegments = new ArrayList<MemorySegment>(16);
+               
+               // the views for the record collections
+               this.recordCollector = new 
SimpleCollectingOutputView(this.recordBufferSegments,
+                       new ListMemorySegmentSource(this.freeMemory), 
this.segmentSize);
+               this.recordBuffer = new 
RandomAccessInputView(this.recordBufferSegments, this.segmentSize);
+               this.recordBufferForComparison = new 
RandomAccessInputView(this.recordBufferSegments, this.segmentSize);
+               
+               // set up normalized key characteristics
+               if (this.comparator.supportsNormalizedKey()) {
+                       // compute the max normalized key length
+                       int numPartialKeys;
+                       try {
+                               numPartialKeys = 
this.comparator.getFlatComparators().length;
+                       } catch (Throwable t) {
+                               numPartialKeys = 1;
+                       }
+                       
+                       int maxLen = Math.min(maxNormalizedKeyBytes, 
MAX_NORMALIZED_KEY_LEN_PER_ELEMENT * numPartialKeys);
+                       
+                       this.numKeyBytes = 
Math.min(this.comparator.getNormalizeKeyLen(), maxLen);
+                       this.normalizedKeyFullyDetermines = 
!this.comparator.isNormalizedKeyPrefixOnly(this.numKeyBytes);
+               }
+               else {
+                       this.numKeyBytes = 0;
+                       this.normalizedKeyFullyDetermines = false;
+               }
+               
+               // compute the index entry size and limits
+               this.indexEntrySize = this.numKeyBytes + OFFSET_LEN;
+               this.indexEntriesPerSegment = segmentSize / this.indexEntrySize;
+               this.lastIndexEntryOffset = (this.indexEntriesPerSegment - 1) * 
this.indexEntrySize;
+               this.swapBuffer = new byte[this.indexEntrySize];
+               
+               // set to initial state
+               this.currentSortIndexSegment = nextMemorySegment();
+               this.sortIndex.add(this.currentSortIndexSegment);
+       }
+
+       // 
-------------------------------------------------------------------------
+       // Memory Segment
+       // 
-------------------------------------------------------------------------
+
+       /**
+        * Resets the sort buffer back to the state where it is empty. All 
contained data is discarded.
+        */
+       @Override
+       public void reset() {
+               // reset all offsets
+               this.numRecords = 0;
+               this.currentSortIndexOffset = 0;
+               this.currentDataBufferOffset = 0;
+               this.sortIndexBytes = 0;
+               
+               // return all memory
+               this.freeMemory.addAll(this.sortIndex);
+               this.freeMemory.addAll(this.recordBufferSegments);
+               this.sortIndex.clear();
+               this.recordBufferSegments.clear();
+               
+               // grab first buffers
+               this.currentSortIndexSegment = nextMemorySegment();
+               this.sortIndex.add(this.currentSortIndexSegment);
+               this.recordCollector.reset();
+       }
+
+       /**
+        * Checks whether the buffer is empty.
+        * 
+        * @return True, if no record is contained, false otherwise.
+        */
+       @Override
+       public boolean isEmpty() {
+               return this.numRecords == 0;
+       }
+       
+       /**
+        * Collects all memory segments from this sorter.
+        * 
+        * @return All memory segments from this sorter.
+        */
+       @Override
+       public List<MemorySegment> dispose() {
+               this.freeMemory.addAll(this.sortIndex);
+               this.freeMemory.addAll(this.recordBufferSegments);
+               
+               this.recordBufferSegments.clear();
+               this.sortIndex.clear();
+               
+               return this.freeMemory;
+       }
+       
+       /**
+        * Gets the total capacity of this sorter, in bytes.
+        * 
+        * @return The sorter's total capacity.
+        */
+       @Override
+       public long getCapacity() {
+               return ((long) this.totalNumBuffers) * this.segmentSize;
+       }
+       
+       /**
+        * Gets the number of bytes currently occupied in this sorter.
+        * 
+        * @return The number of bytes occupied.
+        */
+       @Override
+       public long getOccupancy() {
+               return this.currentDataBufferOffset + this.sortIndexBytes;
+       }
+
+       // 
-------------------------------------------------------------------------
+       // Retrieving and Writing
+       // 
-------------------------------------------------------------------------
+
+       /**
+        * Gets the record at the given logical position.
+        * 
+        * @param reuse The target object to deserialize the record into.
+        * @param logicalPosition The logical position of the record.
+        * @throws IOException Thrown, if an exception occurred during 
deserialization.
+        */
+       @Override
+       public T getRecord(T reuse, int logicalPosition) throws IOException {
+               return getRecordFromBuffer(reuse, readPointer(logicalPosition));
+       }
+
+       /**
+        * Writes a given record to this sort buffer. The written record will 
be appended and take
+        * the last logical position.
+        * 
+        * @param record The record to be written.
+        * @return True, if the record was successfully written, false, if the 
sort buffer was full.
+        * @throws IOException Thrown, if an error occurred while serializing 
the record into the buffers.
+        */
+       @Override
+       public boolean write(T record) throws IOException {
+               //check whether we need a new memory segment for the sort index
+               if (this.currentSortIndexOffset > this.lastIndexEntryOffset) {
+                       if (memoryAvailable()) {
+                               this.currentSortIndexSegment = 
nextMemorySegment();
+                               
this.sortIndex.add(this.currentSortIndexSegment);
+                               this.currentSortIndexOffset = 0;
+                               this.sortIndexBytes += this.segmentSize;
+                       } else {
+                               return false;
+                       }
+               }
+               
+               // add the pointer and the normalized key
+               
this.currentSortIndexSegment.putLong(this.currentSortIndexOffset, 
this.currentDataBufferOffset);
+               if(this.numKeyBytes != 0) {
+                       this.comparator.putNormalizedKey(record, 
this.currentSortIndexSegment, this.currentSortIndexOffset + OFFSET_LEN, 
this.numKeyBytes);
+               }
+               
+               // serialize the record into the data buffers
+               try {
+                       this.serializer.serialize(record, this.recordCollector);
+                       this.currentSortIndexOffset += this.indexEntrySize;
+                       this.currentDataBufferOffset = 
this.recordCollector.getCurrentOffset();
+                       this.numRecords++;
+                       return true;
+               } catch (EOFException eofex) {
+                       return false;
+               }
+       }
+       
+       // 
------------------------------------------------------------------------
+       //                           Access Utilities
+       // 
------------------------------------------------------------------------
+       
+       private final long readPointer(int logicalPosition) {
+               if (logicalPosition < 0 | logicalPosition >= this.numRecords) {
+                       throw new IndexOutOfBoundsException();
+               }
+               
+               final int bufferNum = logicalPosition / 
this.indexEntriesPerSegment;
+               final int segmentOffset = logicalPosition % 
this.indexEntriesPerSegment;
+               
+               return this.sortIndex.get(bufferNum).getLong(segmentOffset * 
this.indexEntrySize);
+       }
+       
+       private final T getRecordFromBuffer(T reuse, long pointer) throws 
IOException {
+               this.recordBuffer.setReadPosition(pointer);
+               return this.serializer.deserialize(reuse, this.recordBuffer);
+       }
+       
+       private final int compareRecords(long pointer1, long pointer2) {
+               this.recordBuffer.setReadPosition(pointer1);
+               this.recordBufferForComparison.setReadPosition(pointer2);
+               
+               try {
+                       return 
this.comparator.compareSerialized(this.recordBuffer, 
this.recordBufferForComparison);
+               } catch (IOException ioex) {
+                       throw new RuntimeException("Error comparing two 
records.", ioex);
+               }
+       }
+       
+       private final boolean memoryAvailable() {
+               return !this.freeMemory.isEmpty();
+       }
+       
+       private final MemorySegment nextMemorySegment() {
+               return this.freeMemory.remove(this.freeMemory.size() - 1);
+       }
+
+       // 
-------------------------------------------------------------------------
+       // Indexed Sorting
+       // 
-------------------------------------------------------------------------
+
+       @Override
+       public int compare(int i, int j) {
+               final int bufferNumI = i / this.indexEntriesPerSegment;
+               final int segmentOffsetI = (i % this.indexEntriesPerSegment) * 
this.indexEntrySize;
+               
+               final int bufferNumJ = j / this.indexEntriesPerSegment;
+               final int segmentOffsetJ = (j % this.indexEntriesPerSegment) * 
this.indexEntrySize;
+               
+               final MemorySegment segI = this.sortIndex.get(bufferNumI);
+               final MemorySegment segJ = this.sortIndex.get(bufferNumJ);
+               
+               int val = MemorySegment.compare(segI, segJ, segmentOffsetI + 
OFFSET_LEN, segmentOffsetJ + OFFSET_LEN, this.numKeyBytes);
+               
+               if (val != 0 || this.normalizedKeyFullyDetermines) {
+                       return this.useNormKeyUninverted ? val : -val;
+               }
+               
+               final long pointerI = segI.getLong(segmentOffsetI);
+               final long pointerJ = segJ.getLong(segmentOffsetJ);
+               
+               return compareRecords(pointerI, pointerJ);
+       }
+
+       @Override
+       public void swap(int i, int j) {
+               final int bufferNumI = i / this.indexEntriesPerSegment;
+               final int segmentOffsetI = (i % this.indexEntriesPerSegment) * 
this.indexEntrySize;
+               
+               final int bufferNumJ = j / this.indexEntriesPerSegment;
+               final int segmentOffsetJ = (j % this.indexEntriesPerSegment) * 
this.indexEntrySize;
+               
+               final MemorySegment segI = this.sortIndex.get(bufferNumI);
+               final MemorySegment segJ = this.sortIndex.get(bufferNumJ);
+               
+               MemorySegment.swapBytes(segI, segJ, this.swapBuffer, 
segmentOffsetI, segmentOffsetJ, this.indexEntrySize);
+       }
+
+       @Override
+       public int size() {
+               return this.numRecords;
+       }
+
+       // 
-------------------------------------------------------------------------
+       
+       /**
+        * Gets an iterator over all records in this buffer in their logical 
order.
+        * 
+        * @return An iterator returning the records in their logical order.
+        */
+       @Override
+       public final MutableObjectIterator<T> getIterator() {
+               return new MutableObjectIterator<T>()
+               {
+                       private final int size = size();
+                       private int current = 0;
+                       
+                       private int currentSegment = 0;
+                       private int currentOffset = 0;
+                       
+                       private MemorySegment currentIndexSegment = 
sortIndex.get(0);
+
+                       @Override
+                       public T next(T target)
+                       {
+                               if (this.current < this.size) {
+                                       this.current++;
+                                       if (this.currentOffset > 
lastIndexEntryOffset) {
+                                               this.currentOffset = 0;
+                                               this.currentIndexSegment = 
sortIndex.get(++this.currentSegment);
+                                       }
+                                       
+                                       long pointer = 
this.currentIndexSegment.getLong(this.currentOffset);
+                                       this.currentOffset += indexEntrySize;
+                                       
+                                       try {
+                                               return 
getRecordFromBuffer(target, pointer);
+                                       }
+                                       catch (IOException ioe) {
+                                               throw new RuntimeException(ioe);
+                                       }
+                               }
+                               else {
+                                       return null;
+                               }
+                       }
+               };
+       }
+       
+       // 
------------------------------------------------------------------------
+       //                Writing to a DataOutputView
+       // 
------------------------------------------------------------------------
+       
+       /**
+        * Writes the records in this buffer in their logical order to the 
given output.
+        * 
+        * @param output The output view to write the records to.
+        * @throws IOException Thrown, if an I/O exception occurred writing to 
the output view.
+        */
+       @Override
+       public void writeToOutput(final ChannelWriterOutputView output) throws 
IOException {
+               int recordsLeft = this.numRecords;
+               int currentMemSeg = 0;
+               while (recordsLeft > 0)
+               {
+                       final MemorySegment currentIndexSegment = 
this.sortIndex.get(currentMemSeg++);
+                       int offset = 0;
+                       // check whether we have a full or partially full 
segment
+                       if (recordsLeft >= this.indexEntriesPerSegment) {
+                               // full segment
+                               for (;offset <= this.lastIndexEntryOffset; 
offset += this.indexEntrySize) {
+                                       final long pointer = 
currentIndexSegment.getLong(offset);
+                                       
this.recordBuffer.setReadPosition(pointer);
+                                       this.serializer.copy(this.recordBuffer, 
output);
+                                       
+                               }
+                               recordsLeft -= this.indexEntriesPerSegment;
+                       } else {
+                               // partially filled segment
+                               for (; recordsLeft > 0; recordsLeft--, offset 
+= this.indexEntrySize)
+                               {
+                                       final long pointer = 
currentIndexSegment.getLong(offset);
+                                       
this.recordBuffer.setReadPosition(pointer);
+                                       this.serializer.copy(this.recordBuffer, 
output);
+                               }
+                       }
+               }
+       }
+       
+       /**
+        * Writes a subset of the records in this buffer in their logical order 
to the given output.
+        * 
+        * @param output The output view to write the records to.
+        * @param start The logical start position of the subset.
+        * @param num The number of elements to write.
+        * @throws IOException Thrown, if an I/O exception occurred writing to 
the output view.
+        */
+       @Override
+       public void writeToOutput(final ChannelWriterOutputView output, final 
int start, int num) throws IOException {
+               int currentMemSeg = start / this.indexEntriesPerSegment;
+               int offset = (start % this.indexEntriesPerSegment) * 
this.indexEntrySize;
+               
+               while (num > 0)
+               {
+                       final MemorySegment currentIndexSegment = 
this.sortIndex.get(currentMemSeg++);
+                       // check whether we have a full or partially full 
segment
+                       if (num >= this.indexEntriesPerSegment && offset == 0) {
+                               // full segment
+                               for (;offset <= this.lastIndexEntryOffset; 
offset += this.indexEntrySize) {
+                                       final long pointer = 
currentIndexSegment.getLong(offset);
+                                       
this.recordBuffer.setReadPosition(pointer);
+                                       this.serializer.copy(this.recordBuffer, 
output);
+                               }
+                               num -= this.indexEntriesPerSegment;
+                       } else {
+                               // partially filled segment
+                               for (; num > 0 && offset <= 
this.lastIndexEntryOffset; num--, offset += this.indexEntrySize)
+                               {
+                                       final long pointer = 
currentIndexSegment.getLong(offset);
+                                       
this.recordBuffer.setReadPosition(pointer);
+                                       this.serializer.copy(this.recordBuffer, 
output);
+                               }
+                       }
+                       offset = 0;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/48256560/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
index dabf9bd..51cc1cf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -63,68 +64,46 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
        //                              Constants
        // 
------------------------------------------------------------------------
 
-       /**
-        * Logging.
-        */
+       /** Logging. */
        private static final Logger LOG = 
LoggerFactory.getLogger(UnilateralSortMerger.class);
        
-       /**
-        * Fix length records with a length below this threshold will be 
in-place sorted, if possible.
-        */
+       /** Fix length records with a length below this threshold will be 
in-place sorted, if possible. */
        private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
        
-       /**
-        * The minimal number of buffers to use by the writers.
-        */
+       /** The minimal number of buffers to use by the writers. */
        protected static final int MIN_NUM_WRITE_BUFFERS = 2;
        
-       /**
-        * The maximal number of buffers to use by the writers.
-        */
-       protected static final int MAX_NUM_WRITE_BUFFERS = 64;
+       /** The maximal number of buffers to use by the writers. */
+       protected static final int MAX_NUM_WRITE_BUFFERS = 4;
        
-       /**
-        * The minimum number of segments that are required for the sort to 
operate.
-        */
+       /** The minimum number of segments that are required for the sort to 
operate. */
        protected static final int MIN_NUM_SORT_MEM_SEGMENTS = 10;
 
        // 
------------------------------------------------------------------------
        //                                  Threads
        // 
------------------------------------------------------------------------
 
-       /**
-        * The thread that reads the input channels into buffers and passes 
them on to the merger.
-        */
+       /** The thread that reads the input channels into buffers and passes 
them on to the merger. */
        private final ThreadBase<E> readThread;
 
-       /**
-        * The thread that merges the buffer handed from the reading thread.
-        */
+       /** The thread that merges the buffer handed from the reading thread. */
        private final ThreadBase<E> sortThread;
 
-       /**
-        * The thread that handles spilling to secondary storage.
-        */
+       /** The thread that handles spilling to secondary storage. */
        private final ThreadBase<E> spillThread;
        
        // 
------------------------------------------------------------------------
        //                                   Memory
        // 
------------------------------------------------------------------------
        
-       /**
-        * The memory segments used first for sorting and later for 
reading/pre-fetching
-        * during the external merge.
-        */
-       protected final ArrayList<MemorySegment> sortReadMemory;
+       /** The memory segments used first for sorting and later for 
reading/pre-fetching
+        * during the external merge. */
+       protected final List<MemorySegment> sortReadMemory;
        
-       /**
-        * The memory segments used to stage data to be written.
-        */
-       protected final ArrayList<MemorySegment> writeMemory;
+       /** The memory segments used to stage data to be written. */
+       protected final List<MemorySegment> writeMemory;
        
-       /**
-        * The memory manager through which memory is allocated and released.
-        */
+       /** The memory manager through which memory is allocated and released. 
*/
        protected final MemoryManager memoryManager;
        
        // 
------------------------------------------------------------------------
@@ -132,6 +111,12 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
        // 
------------------------------------------------------------------------
        
        /**
+        * The handler for large records, that do not go though the in-memory 
sorter as a whole, but
+        * directly go to disk.
+        */
+       private final LargeRecordHandler<E> largeRecordHandler;
+       
+       /**
         * Collection of all currently open channels, to be closed and deleted 
during cleanup.
         */
        private final HashSet<FileIOChannel> openChannels;
@@ -166,26 +151,6 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
        //                         Constructor & Shutdown
        // 
------------------------------------------------------------------------
 
-       /**
-        * Creates a new sorter that reads the data from a given reader and 
provides an iterator returning that
-        * data in a sorted manner. The memory is divided among sort buffers, 
write buffers and read buffers
-        * automatically.
-        * 
-        * @param memoryManager The memory manager from which to allocate the 
memory.
-        * @param ioManager The I/O manager, which is used to write temporary 
files to disk.
-        * @param input The input that is sorted by this sorter.
-        * @param parentTask The parent task, which owns all resources used by 
this sorter.
-        * @param serializerFactory The type serializer.
-        * @param comparator The type comparator establishing the order 
relation.
-        * @param memoryFraction The fraction of memory dedicated to sorting, 
merging and I/O.
-        * @param maxNumFileHandles The maximum number of files to be merged at 
once.
-        * @param startSpillingFraction The faction of the buffers that have to 
be filled before the spilling thread
-        *                              actually begins spilling data to disk.
-        * 
-        * @throws IOException Thrown, if an error occurs initializing the 
resources for external sorting.
-        * @throws MemoryAllocationException Thrown, if not enough memory can 
be obtained from the memory manager to
-        *                                   perform the sort.
-        */
        public UnilateralSortMerger(MemoryManager memoryManager, IOManager 
ioManager,
                        MutableObjectIterator<E> input, AbstractInvokable 
parentTask, 
                        TypeSerializerFactory<E> serializerFactory, 
TypeComparator<E> comparator,
@@ -196,27 +161,6 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
                        memoryFraction, -1, maxNumFileHandles, 
startSpillingFraction);
        }
        
-       /**
-        * Creates a new sorter that reads the data from a given reader and 
provides an iterator returning that
-        * data in a sorted manner. The memory is divided among sort buffers, 
write buffers and read buffers
-        * automatically.
-        * 
-        * @param memoryManager The memory manager from which to allocate the 
memory.
-        * @param ioManager The I/O manager, which is used to write temporary 
files to disk.
-        * @param input The input that is sorted by this sorter.
-        * @param parentTask The parent task, which owns all resources used by 
this sorter.
-        * @param serializerFactory The type serializer.
-        * @param comparator The type comparator establishing the order 
relation.
-        * @param memoryFraction The fraction of memory dedicated to sorting, 
merging and I/O.
-        * @param numSortBuffers The number of distinct buffers to use creation 
of the initial runs.
-        * @param maxNumFileHandles The maximum number of files to be merged at 
once.
-        * @param startSpillingFraction The faction of the buffers that have to 
be filled before the spilling thread
-        *                              actually begins spilling data to disk.
-        * 
-        * @throws IOException Thrown, if an error occurs initializing the 
resources for external sorting.
-        * @throws MemoryAllocationException Thrown, if not enough memory can 
be obtained from the memory manager to
-        *                                   perform the sort.
-        */
        public UnilateralSortMerger(MemoryManager memoryManager, IOManager 
ioManager,
                        MutableObjectIterator<E> input, AbstractInvokable 
parentTask, 
                        TypeSerializerFactory<E> serializerFactory, 
TypeComparator<E> comparator,
@@ -225,37 +169,42 @@ public class UnilateralSortMerger<E> implements Sorter<E> 
{
        throws IOException, MemoryAllocationException
        {
                this(memoryManager, ioManager, input, parentTask, 
serializerFactory, comparator,
-                       memoryFraction, numSortBuffers, maxNumFileHandles, 
startSpillingFraction, false);
+                       memoryFraction, numSortBuffers, maxNumFileHandles, 
startSpillingFraction, false, true);
        }
        
-       /**
-        * Internal constructor and constructor for subclasses that want to 
circumvent the spilling.
-        * 
-        * @param memoryManager The memory manager from which to allocate the 
memory.
-        * @param ioManager The I/O manager, which is used to write temporary 
files to disk.
-        * @param input The input that is sorted by this sorter.
-        * @param parentTask The parent task, which owns all resources used by 
this sorter.
-        * @param serializerFactory The type serializer.
-        * @param comparator The type comparator establishing the order 
relation.
-        * @param memoryFraction The fraction of memory dedicated to sorting, 
merging and I/O.
-        * @param numSortBuffers The number of distinct buffers to use creation 
of the initial runs.
-        * @param maxNumFileHandles The maximum number of files to be merged at 
once.
-        * @param startSpillingFraction The faction of the buffers that have to 
be filled before the spilling thread
-        *                              actually begins spilling data to disk.
-        * @param noSpillingMemory When set to true, no memory will be 
allocated for writing and no spilling thread
-        *                   will be spawned.
-        * 
-        * @throws IOException Thrown, if an error occurs initializing the 
resources for external sorting.
-        * @throws MemoryAllocationException Thrown, if not enough memory can 
be obtained from the memory manager to
-        *                                   perform the sort.
-        */
-       protected UnilateralSortMerger(MemoryManager memoryManager, IOManager 
ioManager,
+       public UnilateralSortMerger(MemoryManager memoryManager, 
List<MemorySegment> memory,
+                       IOManager ioManager,
+                       MutableObjectIterator<E> input, AbstractInvokable 
parentTask, 
+                       TypeSerializerFactory<E> serializerFactory, 
TypeComparator<E> comparator,
+                       int numSortBuffers, int maxNumFileHandles,
+                       float startSpillingFraction, boolean handleLargeRecords)
+       throws IOException
+       {
+               this(memoryManager, memory, ioManager, input, parentTask, 
serializerFactory, comparator,
+                       numSortBuffers, maxNumFileHandles, 
startSpillingFraction, false, handleLargeRecords);
+       }
+       
+       protected UnilateralSortMerger(MemoryManager memoryManager,
+                       IOManager ioManager,
                        MutableObjectIterator<E> input, AbstractInvokable 
parentTask, 
                        TypeSerializerFactory<E> serializerFactory, 
TypeComparator<E> comparator,
                        double memoryFraction, int numSortBuffers, int 
maxNumFileHandles,
-                       float startSpillingFraction, boolean noSpillingMemory)
+                       float startSpillingFraction, boolean noSpillingMemory, 
boolean handleLargeRecords)
        throws IOException, MemoryAllocationException
        {
+               this(memoryManager, memoryManager.allocatePages(parentTask, 
memoryManager.computeNumberOfPages(memoryFraction)),
+                               ioManager, input, parentTask, 
serializerFactory, comparator,
+                               numSortBuffers, maxNumFileHandles, 
startSpillingFraction, noSpillingMemory, true);
+       }
+       
+       protected UnilateralSortMerger(MemoryManager memoryManager, 
List<MemorySegment> memory,
+                       IOManager ioManager,
+                       MutableObjectIterator<E> input, AbstractInvokable 
parentTask, 
+                       TypeSerializerFactory<E> serializerFactory, 
TypeComparator<E> comparator,
+                       int numSortBuffers, int maxNumFileHandles,
+                       float startSpillingFraction, boolean noSpillingMemory, 
boolean handleLargeRecords)
+       throws IOException
+       {
                // sanity checks
                if (memoryManager == null | (ioManager == null && 
!noSpillingMemory) | serializerFactory == null | comparator == null) {
                        throw new NullPointerException();
@@ -270,7 +219,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
                this.memoryManager = memoryManager;
                
                // adjust the memory quotas to the page size
-               final int numPagesTotal = 
memoryManager.computeNumberOfPages(memoryFraction);
+               final int numPagesTotal = memory.size();
 
                if (numPagesTotal < MIN_NUM_WRITE_BUFFERS + 
MIN_NUM_SORT_MEM_SEGMENTS) {
                        throw new IllegalArgumentException("Too little memory 
provided to sorter to perform task. " +
@@ -280,43 +229,52 @@ public class UnilateralSortMerger<E> implements Sorter<E> 
{
                
                // determine how many buffers to use for writing
                final int numWriteBuffers;
-               if (noSpillingMemory) {
+               final int numLargeRecordBuffers;
+               
+               if (noSpillingMemory && !handleLargeRecords) {
                        numWriteBuffers = 0;
-               } else {
-                       // determine how many buffers we have when we do a full 
mere with maximal fan-in 
-                       final int minBuffers = MIN_NUM_WRITE_BUFFERS + 
maxNumFileHandles;
-                       final int desiredBuffers = MIN_NUM_WRITE_BUFFERS + 2 * 
maxNumFileHandles;
+                       numLargeRecordBuffers = 0;
+               }
+               else {
+                       int numConsumers = (noSpillingMemory ? 0 : 1) + 
(handleLargeRecords ? 2 : 0);
                        
-                       if (desiredBuffers > numPagesTotal) {
-                               numWriteBuffers = MIN_NUM_WRITE_BUFFERS;
-                               if (minBuffers > numPagesTotal) {
-                                       maxNumFileHandles = numPagesTotal - 
MIN_NUM_WRITE_BUFFERS;
-                                       if (LOG.isDebugEnabled()) {
-                                               LOG.debug("Reducing maximal 
merge fan-in to " + maxNumFileHandles + " due to limited memory availability 
during merge");
-                                       }
+                       // determine how many buffers we have when we do a full 
mere with maximal fan-in 
+                       final int minBuffersForMerging = maxNumFileHandles + 
numConsumers * MIN_NUM_WRITE_BUFFERS;
+
+                       if (minBuffersForMerging > numPagesTotal) {
+                               numWriteBuffers = noSpillingMemory ? 0 : 
MIN_NUM_WRITE_BUFFERS;
+                               numLargeRecordBuffers = handleLargeRecords ? 
2*MIN_NUM_WRITE_BUFFERS : 0;
+
+                               maxNumFileHandles = numPagesTotal - 
numConsumers * MIN_NUM_WRITE_BUFFERS;
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug("Reducing maximal merge 
fan-in to " + maxNumFileHandles + " due to limited memory availability during 
merge");
                                }
                        }
                        else {
                                // we are free to choose. make sure that we do 
not eat up too much memory for writing
-                               final int designatedWriteBuffers = 
numPagesTotal / (maxNumFileHandles + 1);
-                               final int fractional = numPagesTotal / 64;
-                               final int maximal = numPagesTotal - 
MIN_NUM_SORT_MEM_SEGMENTS;
+                               final int fractionalAuxBuffers = numPagesTotal 
/ (numConsumers * 100);
                                
-                               numWriteBuffers = 
Math.max(MIN_NUM_WRITE_BUFFERS,       // at least the lower bound
-                                       
Math.min(Math.min(MAX_NUM_WRITE_BUFFERS, maximal),              // at most the 
lower of the upper bounds
-                                       Math.min(designatedWriteBuffers, 
fractional)));                 // the lower of the average
+                               if (fractionalAuxBuffers >= 
MAX_NUM_WRITE_BUFFERS) {
+                                       numWriteBuffers = noSpillingMemory ? 0 
: MAX_NUM_WRITE_BUFFERS;
+                                       numLargeRecordBuffers = 
handleLargeRecords ? 2*MAX_NUM_WRITE_BUFFERS : 0;
+                               }
+                               else {
+                                       numWriteBuffers = noSpillingMemory ? 0 :
+                                                       
Math.max(MIN_NUM_WRITE_BUFFERS, fractionalAuxBuffers);  // at least the lower 
bound
+                                       
+                                       numLargeRecordBuffers = 
handleLargeRecords ? 
+                                                       
Math.max(2*MIN_NUM_WRITE_BUFFERS, fractionalAuxBuffers) // at least the lower 
bound
+                                                       : 0;
+                               }
                        }
                }
                
-               final int sortMemPages = numPagesTotal - numWriteBuffers;
+               final int sortMemPages = numPagesTotal - numWriteBuffers - 
numLargeRecordBuffers;
                final long sortMemory = ((long) sortMemPages) * 
memoryManager.getPageSize();
                
                // decide how many sort buffers to use
                if (numSortBuffers < 1) {
-                       if (sortMemory > 96 * 1024 * 1024) {
-                               numSortBuffers = 3;
-                       }
-                       else if (sortMemPages >= 2 * MIN_NUM_SORT_MEM_SEGMENTS) 
{
+                       if (sortMemory > 100 * 1024 * 1024) {
                                numSortBuffers = 2;
                        }
                        else {
@@ -326,27 +284,42 @@ public class UnilateralSortMerger<E> implements Sorter<E> 
{
                final int numSegmentsPerSortBuffer = sortMemPages / 
numSortBuffers;
                
                if (LOG.isDebugEnabled()) {
-                       LOG.debug("Instantiating sorter with " + sortMemPages + 
" pages of sorting memory (=" +
-                               sortMemory + " bytes total) divided over " + 
numSortBuffers + " sort buffers (" + 
-                               numSegmentsPerSortBuffer + " pages per buffer). 
Using " + numWriteBuffers + 
-                               " buffers for writing sorted results and 
merging maximally " + maxNumFileHandles +
-                               " streams at once.");
+                       LOG.debug(String.format("Instantiating sorter with %d 
pages of sorting memory (="
+                                       + "%d bytes total) divided over %d sort 
buffers (%d pages per buffer). Using %d" 
+                                       + " buffers for writing sorted results 
and merging maximally %d streams at once. "
+                                       + "Using %d memory segments for large 
record spilling.",
+                                       sortMemPages, sortMemory, 
numSortBuffers, numSegmentsPerSortBuffer, numWriteBuffers,
+                                       maxNumFileHandles, 
numLargeRecordBuffers));
                }
                
+               
+               this.sortReadMemory = memory;
                this.writeMemory = new 
ArrayList<MemorySegment>(numWriteBuffers);
-               this.sortReadMemory = new 
ArrayList<MemorySegment>(sortMemPages);
                
-               // allocate the memory
-               memoryManager.allocatePages(parentTask, this.sortReadMemory, 
sortMemPages);
+               final TypeSerializer<E> serializer = 
serializerFactory.getSerializer();
+               
+               // move some pages from the sort memory to the write memory
                if (numWriteBuffers > 0) {
-                       memoryManager.allocatePages(parentTask, 
this.writeMemory, numWriteBuffers);
+                       for (int i = 0; i < numWriteBuffers; i++) {
+                               
this.writeMemory.add(this.sortReadMemory.remove(this.sortReadMemory.size() - 
1));
+                       }
+               }
+               if (numLargeRecordBuffers > 0) {
+                       List<MemorySegment> mem = new 
ArrayList<MemorySegment>();
+                       for (int i = 0; i < numLargeRecordBuffers; i++) {
+                               
mem.add(this.sortReadMemory.remove(this.sortReadMemory.size() - 1));
+                       }
+                       
+                       this.largeRecordHandler = new 
LargeRecordHandler<E>(serializer, comparator.duplicate(), 
+                                       ioManager, memoryManager, mem, 
parentTask, maxNumFileHandles);
+               }
+               else {
+                       this.largeRecordHandler = null;
                }
                
                // circular queues pass buffers between the threads
                final CircularQueues<E> circularQueues = new 
CircularQueues<E>();
                
-               final TypeSerializer<E> serializer = 
serializerFactory.getSerializer();
-               
                // allocate the sort buffers and fill empty queue with them
                final Iterator<MemorySegment> segments = 
this.sortReadMemory.iterator();
                for (int i = 0; i < numSortBuffers; i++)
@@ -390,8 +363,8 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
                this.openChannels = new HashSet<FileIOChannel>(64);
 
                // start the thread that reads the input channels
-               this.readThread = getReadingThread(exceptionHandler, input, 
circularQueues, parentTask,
-                       serializer, ((long) (startSpillingFraction * 
sortMemory)));
+               this.readThread = getReadingThread(exceptionHandler, input, 
circularQueues, largeRecordHandler,
+                               parentTask, serializer, ((long) 
(startSpillingFraction * sortMemory)));
 
                // start the thread that sorts the buffers
                this.sortThread = getSortingThread(exceptionHandler, 
circularQueues, parentTask);
@@ -543,6 +516,12 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
                                }
                                catch (Throwable t) {}
                        }
+                       
+                       try {
+                               if (this.largeRecordHandler != null) {
+                                       this.largeRecordHandler.close();
+                               }
+                       } catch (Throwable t) {}
                }
        }
 
@@ -574,11 +553,12 @@ public class UnilateralSortMerger<E> implements Sorter<E> 
{
         *         them into a queue.
         */
        protected ThreadBase<E> getReadingThread(ExceptionHandler<IOException> 
exceptionHandler,
-                       MutableObjectIterator<E> reader, CircularQueues<E> 
queues, AbstractInvokable parentTask,
+                       MutableObjectIterator<E> reader, CircularQueues<E> 
queues, 
+                       LargeRecordHandler<E> largeRecordHandler, 
AbstractInvokable parentTask,
                        TypeSerializer<E> serializer, long startSpillingBytes)
        {
-               return new ReadingThread<E>(exceptionHandler, reader, queues, 
serializer.createInstance(),
-                       parentTask, startSpillingBytes);
+               return new ReadingThread<E>(exceptionHandler, reader, queues, 
largeRecordHandler, 
+                               serializer.createInstance(),parentTask, 
startSpillingBytes);
        }
 
        protected ThreadBase<E> getSortingThread(ExceptionHandler<IOException> 
exceptionHandler, CircularQueues<E> queues,
@@ -853,19 +833,15 @@ public class UnilateralSortMerger<E> implements Sorter<E> 
{
         */
        protected static class ReadingThread<E> extends ThreadBase<E> {
                
-               /**
-                * The input channels to read from.
-                */
+               /** The input channels to read from. */
                private final MutableObjectIterator<E> reader;
                
-               /**
-                * The fraction of the buffers that must be full before the 
spilling starts.
-                */
+               private final LargeRecordHandler<E> largeRecords;
+               
+               /** The fraction of the buffers that must be full before the 
spilling starts. */
                private final long startSpillingBytes;
                
-               /**
-                * The object into which the thread reads the data from the 
input.
-                */
+               /** The object into which the thread reads the data from the 
input. */
                private final E readTarget;
 
                /**
@@ -878,7 +854,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
                 */
                public ReadingThread(ExceptionHandler<IOException> 
exceptionHandler,
                                MutableObjectIterator<E> reader, 
CircularQueues<E> queues,
-                               E readTarget,
+                               LargeRecordHandler<E> largeRecordsHandler, E 
readTarget,
                                AbstractInvokable parentTask, long 
startSpillingBytes)
                {
                        super(exceptionHandler, "SortMerger Reading Thread", 
queues, parentTask);
@@ -887,14 +863,15 @@ public class UnilateralSortMerger<E> implements Sorter<E> 
{
                        this.reader = reader;
                        this.readTarget = readTarget;
                        this.startSpillingBytes = startSpillingBytes;
+                       this.largeRecords = largeRecordsHandler;
                }
 
                /**
                 * The entry point for the thread. Gets a buffer for all 
threads and then loops as long as there is input
                 * available.
                 */
-               public void go() throws IOException
-               {       
+               public void go() throws IOException {
+                       
                        final MutableObjectIterator<E> reader = this.reader;
                        
                        E current = this.readTarget;
@@ -921,12 +898,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
                                                element = 
this.queues.empty.take();
                                        }
                                        catch (InterruptedException iex) {
-                                               if (isRunning()) {
-                                                       LOG.error("Reading 
thread was interrupted (without being shut down) while grabbing a buffer. " +
-                                                                       
"Retrying to grab buffer...");
-                                               } else {
-                                                       return;
-                                               }
+                                               throw new IOException(iex);
                                        }
                                }
                                
@@ -943,7 +915,13 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
                                // write the last leftover pair, if we have one
                                if (leftoverRecord != null) {
                                        if (!buffer.write(leftoverRecord)) {
-                                               throw new IOException("Record 
could not be written to empty buffer: Serialized record exceeds buffer 
capacity.");
+                                               if (this.largeRecords != null) {
+                                                       
this.largeRecords.addRecord(leftoverRecord);
+                                               } else {
+                                                       throw new 
IOException("The record exceeds the maximum size of a sort buffer (current 
maximum: "
+                                                                       + 
buffer.getCapacity() + " bytes).");
+                                               }
+                                               buffer.reset();
                                        }
                                        leftoverRecord = null;
                                }
@@ -1045,6 +1023,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> 
{
                                        this.queues.sort.add(element);
                                }
                                else {
+                                       buffer.reset();
                                        this.queues.empty.add(element);
                                }
                                element = null;
@@ -1088,8 +1067,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> 
{
                /**
                 * Entry point of the thread.
                 */
-               public void go() throws IOException
-               {                       
+               public void go() throws IOException {
                        boolean alive = true;
 
                        // loop as long as the thread is marked alive
@@ -1113,10 +1091,17 @@ public class UnilateralSortMerger<E> implements 
Sorter<E> {
                                }
 
                                if (element != EOF_MARKER && element != 
SPILLING_MARKER) {
+                                       
+                                       if (element.buffer.size() == 0) {
+                                               element.buffer.reset();
+                                               this.queues.empty.add(element);
+                                               continue;
+                                       }
+                                       
                                        if (LOG.isDebugEnabled()) {
                                                LOG.debug("Sorting buffer " + 
element.id + ".");
                                        }
-                                       
+
                                        this.sorter.sort(element.buffer);
                                        
                                        if (LOG.isDebugEnabled()) {
@@ -1150,9 +1135,9 @@ public class UnilateralSortMerger<E> implements Sorter<E> 
{
                
                protected final List<MemorySegment> writeMemory;        // 
memory segments for writing
                
-               protected final List<MemorySegment> sortReadMemory;     // 
memory segments for sorting/reading
+               protected final List<MemorySegment> mergeReadMemory;    // 
memory segments for sorting/reading
                
-               protected final int maxNumFileHandles;
+               protected final int maxFanIn;
                
                protected final int numWriteBuffersToCluster;
                
@@ -1180,9 +1165,9 @@ public class UnilateralSortMerger<E> implements Sorter<E> 
{
                        this.ioManager = ioManager;
                        this.serializer = serializer;
                        this.comparator = comparator;
-                       this.sortReadMemory = sortReadMemory;
+                       this.mergeReadMemory = sortReadMemory;
                        this.writeMemory = writeMemory;
-                       this.maxNumFileHandles = maxNumFileHandles;
+                       this.maxFanIn = maxNumFileHandles;
                        this.numWriteBuffersToCluster = writeMemory.size() >= 4 
? writeMemory.size() / 2 : 1;
                }
 
@@ -1203,14 +1188,9 @@ public class UnilateralSortMerger<E> implements 
Sorter<E> {
                                        element = this.queues.spill.take();
                                }
                                catch (InterruptedException iex) {
-                                       if (isRunning()) {
-                                               LOG.error("Sorting thread was 
interrupted (without being shut down) while grabbing a buffer. " +
-                                                               "Retrying to 
grab buffer...");
-                                               continue;
-                                       } else {
-                                               return;
-                                       }
+                                       throw new IOException("The spilling 
thread was interrupted.");
                                }
+
                                if (element == SPILLING_MARKER) {
                                        break;
                                }
@@ -1226,21 +1206,46 @@ public class UnilateralSortMerger<E> implements 
Sorter<E> {
                                return;
                        }
                        
+                       MutableObjectIterator<E> largeRecords = null;
+                       
+                       // check if we can stay in memory with the large record 
handler
+                       if (cacheOnly && largeRecordHandler != null && 
largeRecordHandler.hasData()) {
+                               List<MemorySegment> memoryForLargeRecordSorting 
= new ArrayList<MemorySegment>();
+                               
+                               CircularElement<E> circElement;
+                               while ((circElement = this.queues.empty.poll()) 
!= null) {
+                                       
memoryForLargeRecordSorting.addAll(circElement.buffer.dispose());
+                               }
+                               
+                               if (memoryForLargeRecordSorting.isEmpty()) {
+                                       cacheOnly = false;
+                                       LOG.debug("Going to disk-based merge 
because of large records.");
+                                       
+                               } else {
+                                       LOG.debug("Sorting large records, to 
add them to in-memory merge.");
+                                       largeRecords = 
largeRecordHandler.finishWriteAndSortKeys(memoryForLargeRecordSorting);
+                               }
+                       }
+                       
                        // ------------------- In-Memory Merge 
------------------------
                        if (cacheOnly) {
-                               /* operates on in-memory segments only */
+                               // operates on in-memory buffers only
                                if (LOG.isDebugEnabled()) {
                                        LOG.debug("Initiating in memory 
merge.");
                                }
                                
-                               List<MutableObjectIterator<E>> iterators = new 
ArrayList<MutableObjectIterator<E>>(cache.size());
-                                                               
+                               List<MutableObjectIterator<E>> iterators = new 
ArrayList<MutableObjectIterator<E>>(cache.size() + 1);
+                               
                                // iterate buffers and collect a set of 
iterators
                                for (CircularElement<E> cached : cache) {
                                        // note: the yielded iterator only 
operates on the buffer heap (and disregards the stack)
                                        
iterators.add(cached.buffer.getIterator());
                                }
                                
+                               if (largeRecords != null) {
+                                       iterators.add(largeRecords);
+                               }
+                               
                                // release the remaining sort-buffers
                                if (LOG.isDebugEnabled()) {
                                        LOG.debug("Releasing unused sort-buffer 
memory.");
@@ -1252,7 +1257,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> 
{
                                                iterators.size() == 1 ? 
iterators.get(0) : 
                                                new MergeIterator<E>(iterators, 
this.comparator));
                                return;
-                       }                       
+                       }
                        
                        // ------------------- Spilling Phase 
------------------------
                        
@@ -1261,7 +1266,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> 
{
 
                        
                        // loop as long as the thread is marked alive and we do 
not see the final element
-                       while (isRunning())     {
+                       while (isRunning()) {
                                try {
                                        element = takeNext(this.queues.spill, 
cache);
                                }
@@ -1322,11 +1327,50 @@ public class UnilateralSortMerger<E> implements 
Sorter<E> {
                        // clear the sort buffers, but do not return the memory 
to the manager, as we use it for merging
                        disposeSortBuffers(false);
 
+                       
                        // ------------------- Merging Phase 
------------------------
                        
+                       // make sure we have enough memory to merge and for 
large record handling
+                       List<MemorySegment> mergeReadMemory;
+                       
+                       if (largeRecordHandler != null && 
largeRecordHandler.hasData()) {
+                               
+                               List<MemorySegment> longRecMem;
+                               if (channelIDs.isEmpty()) {
+                                       // only long records
+                                       longRecMem = this.mergeReadMemory;
+                                       mergeReadMemory = 
Collections.emptyList();
+                               }
+                               else {
+                                       int maxMergedStreams = 
Math.min(this.maxFanIn, channelIDs.size());
+                                       
+                                       int pagesPerStream = 
Math.max(MIN_NUM_WRITE_BUFFERS,
+                                                       
Math.min(MAX_NUM_WRITE_BUFFERS, this.mergeReadMemory.size() / 2 / 
maxMergedStreams));
+                                       
+                                       int totalMergeReadMemory = 
maxMergedStreams * pagesPerStream;
+                                       
+                                       // grab the merge memory
+                                       mergeReadMemory = new 
ArrayList<MemorySegment>(totalMergeReadMemory);
+                                       for (int i = 0; i < 
totalMergeReadMemory; i++) {
+                                               
mergeReadMemory.add(this.mergeReadMemory.get(i));
+                                       }
+                                       
+                                       // the remainder of the memory goes to 
the long record sorter
+                                       longRecMem = new 
ArrayList<MemorySegment>();
+                                       for (int i = totalMergeReadMemory; i < 
this.mergeReadMemory.size(); i++) {
+                                               
longRecMem.add(this.mergeReadMemory.get(i));
+                                       }
+                               }
+                               
+                               largeRecords = 
largeRecordHandler.finishWriteAndSortKeys(longRecMem);
+                       }
+                       else {
+                               mergeReadMemory = this.mergeReadMemory;
+                       }
+                       
                        // merge channels until sufficient file handles are 
available
-                       while (isRunning() && channelIDs.size() > 
this.maxNumFileHandles) {
-                               channelIDs = mergeChannelList(channelIDs, 
this.sortReadMemory, this.writeMemory);
+                       while (isRunning() && channelIDs.size() > 
this.maxFanIn) {
+                               channelIDs = mergeChannelList(channelIDs, 
mergeReadMemory, this.writeMemory);
                        }
                        
                        // from here on, we won't write again
@@ -1335,7 +1379,11 @@ public class UnilateralSortMerger<E> implements 
Sorter<E> {
                        
                        // check if we have spilled some data at all
                        if (channelIDs.isEmpty()) {
-                               
setResultIterator(EmptyMutableObjectIterator.<E>get());
+                               if (largeRecords == null) {
+                                       
setResultIterator(EmptyMutableObjectIterator.<E>get());
+                               } else {
+                                       setResultIterator(largeRecords);
+                               }
                        }
                        else {
                                if (LOG.isDebugEnabled()) {
@@ -1346,10 +1394,10 @@ public class UnilateralSortMerger<E> implements 
Sorter<E> {
                                List<List<MemorySegment>> readBuffers = new 
ArrayList<List<MemorySegment>>(channelIDs.size());
                                
                                // allocate the read memory and register it to 
be released
-                               getSegmentsForReaders(readBuffers, 
this.sortReadMemory, channelIDs.size());
+                               getSegmentsForReaders(readBuffers, 
mergeReadMemory, channelIDs.size());
                                
                                // get the readers and register them to be 
released
-                               
setResultIterator(getMergingIterator(channelIDs, readBuffers, new 
ArrayList<FileIOChannel>(channelIDs.size())));
+                               
setResultIterator(getMergingIterator(channelIDs, readBuffers, new 
ArrayList<FileIOChannel>(channelIDs.size()), largeRecords));
                        }
 
                        // done
@@ -1361,8 +1409,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> 
{
                /**
                 * Releases the memory that is registered for in-memory sorted 
run generation.
                 */
-               protected final void disposeSortBuffers(boolean releaseMemory)
-               {
+               protected final void disposeSortBuffers(boolean releaseMemory) {
                        while (!this.queues.empty.isEmpty()) {
                                try {
                                        final InMemorySorter<?> sorter = 
this.queues.empty.take().buffer;
@@ -1403,7 +1450,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> 
{
                 * @throws IOException Thrown, if the readers encounter an I/O 
problem.
                 */
                protected final MergeIterator<E> getMergingIterator(final 
List<ChannelWithBlockCount> channelIDs,
-                               final List<List<MemorySegment>> inputSegments, 
List<FileIOChannel> readerList)
+                               final List<List<MemorySegment>> inputSegments, 
List<FileIOChannel> readerList, MutableObjectIterator<E> largeRecords)
                        throws IOException
                {
                        // create one iterator per channel id
@@ -1411,7 +1458,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> 
{
                                LOG.debug("Performing merge of " + 
channelIDs.size() + " sorted streams.");
                        }
                        
-                       final List<MutableObjectIterator<E>> iterators = new 
ArrayList<MutableObjectIterator<E>>(channelIDs.size());
+                       final List<MutableObjectIterator<E>> iterators = new 
ArrayList<MutableObjectIterator<E>>(channelIDs.size() + 1);
                        
                        for (int i = 0; i < channelIDs.size(); i++) {
                                final ChannelWithBlockCount channel = 
channelIDs.get(i);
@@ -1429,6 +1476,10 @@ public class UnilateralSortMerger<E> implements 
Sorter<E> {
                                                                                
                                                                        
channel.getBlockCount(), false);
                                iterators.add(new 
ChannelReaderInputViewIterator<E>(inView, null, this.serializer));
                        }
+                       
+                       if (largeRecords != null) {
+                               iterators.add(largeRecords);
+                       }
 
                        return new MergeIterator<E>(iterators, this.comparator);
                }
@@ -1446,7 +1497,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> 
{
                                        final List<MemorySegment> 
allReadBuffers, final List<MemorySegment> writeBuffers)
                throws IOException
                {
-                       final double numMerges = Math.ceil(channelIDs.size() / 
((double) this.maxNumFileHandles));
+                       final double numMerges = Math.ceil(channelIDs.size() / 
((double) this.maxFanIn));
                        final int channelsToMergePerStep = (int) 
Math.ceil(channelIDs.size() / numMerges);
                        
                        // allocate the memory for the merging step
@@ -1494,7 +1545,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> 
{
                        final List<FileIOChannel> channelAccesses = new 
ArrayList<FileIOChannel>(channelIDs.size());
 
                        // the list with the target iterators
-                       final MergeIterator<E> mergeIterator = 
getMergingIterator(channelIDs, readBuffers, channelAccesses);
+                       final MergeIterator<E> mergeIterator = 
getMergingIterator(channelIDs, readBuffers, channelAccesses, null);
 
                        // create a new channel writer
                        final FileIOChannel.ID mergedChannelID = 
this.ioManager.createChannel();

Reply via email to