http://git-wip-us.apache.org/repos/asf/flink/blob/48256560/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
index a340ef2..914359d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
@@ -16,18 +16,31 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators.sort;
 
-import java.util.Comparator;
+import static org.junit.Assert.*;
 
 import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Random;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.api.common.typeutils.record.RecordComparator;
 import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import 
org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -348,4 +361,214 @@ public class ExternalSortITCase {
                Assert.assertEquals("Not all pairs were read back in.", PAIRS, 
pairsRead);
                merger.close();
        }
+       
+       @Test
+       public void testSortWithLongRecordsOnly() {
+               try {
+                       final int NUM_RECORDS = 10;
+                       
+                       final TypeInformation<?>[] types = new 
TypeInformation<?>[] {
+                                       BasicTypeInfo.LONG_TYPE_INFO,
+                                       new 
ValueTypeInfo<SomeMaybeLongValue>(SomeMaybeLongValue.class)
+                               };
+                       
+                       final TupleTypeInfo<Tuple2<Long, SomeMaybeLongValue>> 
typeInfo = 
+                                                               new 
TupleTypeInfo<Tuple2<Long,SomeMaybeLongValue>>(types);
+                       final TypeSerializer<Tuple2<Long, SomeMaybeLongValue>> 
serializer = typeInfo.createSerializer();
+                       final TypeComparator<Tuple2<Long, SomeMaybeLongValue>> 
comparator = typeInfo.createComparator(new int[] {0}, new boolean[]{false}, 0);
+                       
+                       MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> 
source = 
+                                       new MutableObjectIterator<Tuple2<Long, 
SomeMaybeLongValue>>()
+                       {
+                               private final Random rnd = new Random();
+                               private int num = 0;
+                               
+                               @Override
+                               public Tuple2<Long, SomeMaybeLongValue> 
next(Tuple2<Long, SomeMaybeLongValue> reuse) {
+                                       if (num++ < NUM_RECORDS) {
+                                               long val = rnd.nextLong();
+                                               return new Tuple2<Long, 
SomeMaybeLongValue>(val, new SomeMaybeLongValue((int) val));
+                                       }
+                                       else {
+                                               return null;
+                                       }
+                                       
+                               }
+                       };
+                       
+                       @SuppressWarnings("unchecked")
+                       Sorter<Tuple2<Long, SomeMaybeLongValue>> sorter = new 
UnilateralSortMerger<Tuple2<Long, SomeMaybeLongValue>>(
+                                       this.memoryManager, this.ioManager, 
+                                       source, this.parentTask,
+                                       new 
RuntimeStatefulSerializerFactory<Tuple2<Long, SomeMaybeLongValue>>(serializer, 
(Class<Tuple2<Long, SomeMaybeLongValue>>) (Class<?>) Tuple2.class),
+                                       comparator, 1.0, 1, 128, 0.7f);
+                       
+                       // check order
+                       MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> 
iterator = sorter.getIterator();
+                       
+                       Tuple2<Long, SomeMaybeLongValue> val = 
serializer.createInstance();
+                       
+                       long prevKey = Long.MAX_VALUE;
+
+                       for (int i = 0; i < NUM_RECORDS; i++) {
+                               val = iterator.next(val);
+                               
+                               assertTrue(val.f0 <= prevKey);
+                               assertTrue(val.f0.intValue() == val.f1.val());
+                       }
+                       
+                       assertNull(iterator.next(val));
+                       
+                       sorter.close();
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testSortWithLongAndShortRecordsMixed() {
+               try {
+                       final int NUM_RECORDS = 1000000;
+                       final int LARGE_REC_INTERVAL = 100000;
+                       
+                       final TypeInformation<?>[] types = new 
TypeInformation<?>[] {
+                                       BasicTypeInfo.LONG_TYPE_INFO,
+                                       new 
ValueTypeInfo<SomeMaybeLongValue>(SomeMaybeLongValue.class)
+                               };
+                       
+                       final TupleTypeInfo<Tuple2<Long, SomeMaybeLongValue>> 
typeInfo = 
+                                                               new 
TupleTypeInfo<Tuple2<Long,SomeMaybeLongValue>>(types);
+                       final TypeSerializer<Tuple2<Long, SomeMaybeLongValue>> 
serializer = typeInfo.createSerializer();
+                       final TypeComparator<Tuple2<Long, SomeMaybeLongValue>> 
comparator = typeInfo.createComparator(new int[] {0}, new boolean[]{false}, 0);
+                       
+                       MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> 
source = 
+                                       new MutableObjectIterator<Tuple2<Long, 
SomeMaybeLongValue>>()
+                       {
+                               private final Random rnd = new Random();
+                               private int num = -1;
+                               
+                               @Override
+                               public Tuple2<Long, SomeMaybeLongValue> 
next(Tuple2<Long, SomeMaybeLongValue> reuse) {
+                                       if (++num < NUM_RECORDS) {
+                                               long val = rnd.nextLong();
+                                               return new Tuple2<Long, 
SomeMaybeLongValue>(val, new SomeMaybeLongValue((int) val, num % 
LARGE_REC_INTERVAL == 0));
+                                       }
+                                       else {
+                                               return null;
+                                       }
+                                       
+                               }
+                       };
+                       
+                       @SuppressWarnings("unchecked")
+                       Sorter<Tuple2<Long, SomeMaybeLongValue>> sorter = new 
UnilateralSortMerger<Tuple2<Long, SomeMaybeLongValue>>(
+                                       this.memoryManager, this.ioManager, 
+                                       source, this.parentTask,
+                                       new 
RuntimeStatefulSerializerFactory<Tuple2<Long, SomeMaybeLongValue>>(serializer, 
(Class<Tuple2<Long, SomeMaybeLongValue>>) (Class<?>) Tuple2.class),
+                                       comparator, 1.0, 1, 128, 0.7f);
+                       
+                       // check order
+                       MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> 
iterator = sorter.getIterator();
+                       
+                       Tuple2<Long, SomeMaybeLongValue> val = 
serializer.createInstance();
+                       
+                       long prevKey = Long.MAX_VALUE;
+
+                       for (int i = 0; i < NUM_RECORDS; i++) {
+                               val = iterator.next(val);
+                               
+                               assertTrue(val.f0 <= prevKey);
+                               assertTrue(val.f0.intValue() == val.f1.val());
+                       }
+                       
+                       assertNull(iterator.next(val));
+                       
+                       sorter.close();
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       public static final class SomeMaybeLongValue implements 
org.apache.flink.types.Value {
+               
+               private static final long serialVersionUID = 1L;
+
+               private static final byte[] BUFFER = new byte[100000000];
+               
+               static {
+                       for (int i = 0; i < BUFFER.length; i++) {
+                               BUFFER[i] = (byte) i;
+                       }
+               }
+               
+               private int val;
+               
+               private boolean isLong;
+               
+
+               public SomeMaybeLongValue() {
+                       this.isLong = true;
+               }
+               
+               public SomeMaybeLongValue(int val) {
+                       this.val = val;
+                       this.isLong = true;
+               }
+               
+               public SomeMaybeLongValue(int val, boolean isLong) {
+                       this.val = val;
+                       this.isLong = isLong;
+               }
+               
+               public int val() {
+                       return val;
+               }
+               
+               public boolean isLong() {
+                       return isLong;
+               }
+               
+               @Override
+               public void read(DataInputView in) throws IOException {
+                       val = in.readInt();
+                       isLong = in.readBoolean();
+                       
+                       if (isLong) {
+                               for (int i = 0; i < BUFFER.length; i++) {
+                                       byte b = in.readByte();
+                                       assertEquals(BUFFER[i], b);
+                               }
+                       }
+               }
+               
+               @Override
+               public void write(DataOutputView out) throws IOException {
+                       out.writeInt(val);
+                       out.writeBoolean(isLong);
+                       if (isLong) {
+                               out.write(BUFFER);
+                       }
+               }
+               
+               @Override
+               public int hashCode() {
+                       return val;
+               }
+               
+               @Override
+               public boolean equals(Object obj) {
+                       return (obj instanceof SomeMaybeLongValue) && 
((SomeMaybeLongValue) obj).val == this.val;
+               }
+               
+               @Override
+               public String toString() {
+                       return isLong ? "Large Value" : "Small Value";
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/48256560/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
new file mode 100644
index 0000000..3d237f7
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.FileChannelOutputView;
+import org.apache.flink.runtime.io.disk.SeekableFileChannelInputView;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.Test;
+
+public class LargeRecordHandlerITCase {
+
+       @Test
+       public void testRecordHandlerCompositeKey() {
+               
+               final IOManager ioMan = new IOManagerAsync();
+               final int PAGE_SIZE = 4 * 1024;
+               final int NUM_PAGES = 1000;
+               final int NUM_RECORDS = 10;
+               
+               try {
+                       final DefaultMemoryManager memMan = new 
DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE);
+                       final AbstractInvokable owner = new DummyInvokable();
+                       
+                       final List<MemorySegment> initialMemory = 
memMan.allocatePages(owner, 6);
+                       final List<MemorySegment> sortMemory = 
memMan.allocatePages(owner, NUM_PAGES - 6);
+
+                       final TypeInformation<?>[] types = new 
TypeInformation<?>[] {
+                                       BasicTypeInfo.LONG_TYPE_INFO,
+                                       new 
ValueTypeInfo<SomeVeryLongValue>(SomeVeryLongValue.class),
+                                       BasicTypeInfo.BYTE_TYPE_INFO
+                               };
+                       
+                       final TupleTypeInfo<Tuple3<Long, SomeVeryLongValue, 
Byte>> typeInfo = 
+                                                               new 
TupleTypeInfo<Tuple3<Long,SomeVeryLongValue,Byte>>(types);
+
+                       final TypeSerializer<Tuple3<Long, SomeVeryLongValue, 
Byte>> serializer = typeInfo.createSerializer();
+                       final TypeComparator<Tuple3<Long, SomeVeryLongValue, 
Byte>> comparator = typeInfo.createComparator(
+                                       new int[] {2, 0}, new boolean[] {true, 
true}, 0);
+                       
+                       LargeRecordHandler<Tuple3<Long, SomeVeryLongValue, 
Byte>> handler = new LargeRecordHandler<Tuple3<Long, SomeVeryLongValue, Byte>>(
+                                       serializer, comparator, ioMan, memMan, 
initialMemory, owner, 128);
+                       
+                       assertFalse(handler.hasData());
+                       
+                       
+                       // add the test data
+                       Random rnd = new Random();
+                       
+                       for (int i = 0; i < NUM_RECORDS; i++) {
+                               long val = rnd.nextLong();
+                               handler.addRecord(new Tuple3<Long, 
SomeVeryLongValue, Byte>(val, new SomeVeryLongValue((int) val), (byte) val));
+                               assertTrue(handler.hasData());
+                       }
+                       
+                       MutableObjectIterator<Tuple3<Long, SomeVeryLongValue, 
Byte>> sorted = handler.finishWriteAndSortKeys(sortMemory);
+                       
+                       try {
+                               handler.addRecord(new Tuple3<Long, 
SomeVeryLongValue, Byte>(92L, null, (byte) 1));
+                               fail("should throw an exception");
+                       }
+                       catch (IllegalStateException e) {
+                               // expected
+                       }
+                       
+                       Tuple3<Long, SomeVeryLongValue, Byte> previous = null;
+                       Tuple3<Long, SomeVeryLongValue, Byte> next;
+                       
+                       while ((next = sorted.next(null)) != null) {
+                               // key and value must be equal
+                               assertTrue(next.f0.intValue() == next.f1.val());
+                               assertTrue(next.f0.byteValue() == next.f2);
+                               
+                               // order must be correct
+                               if (previous != null) {
+                                       assertTrue(previous.f2 <= next.f2);
+                                       assertTrue(previous.f2.byteValue() != 
next.f2.byteValue() || previous.f0 <= next.f0);
+                               }
+                               previous = next;
+                       }
+                       
+                       handler.close();
+                       
+                       assertFalse(handler.hasData());
+                       
+                       handler.close();
+                       
+                       try {
+                               handler.addRecord(new Tuple3<Long, 
SomeVeryLongValue, Byte>(92L, null, (byte) 1));
+                               fail("should throw an exception");
+                       }
+                       catch (IllegalStateException e) {
+                               // expected
+                       }
+                       
+                       assertTrue(memMan.verifyEmpty());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+               finally {
+                       if (ioMan != null) {
+                               ioMan.shutdown();
+                       }
+               }
+       }
+       
+       public static final class SomeVeryLongValue implements Value {
+               
+               private static final long serialVersionUID = 1L;
+
+               private static final byte[] BUFFER = new byte[50000000];
+               
+//             private static final byte[] BUFFER = new byte[500000000];
+               
+               static {
+                       for (int i = 0; i < BUFFER.length; i++) {
+                               BUFFER[i] = (byte) i;
+                       }
+               }
+               
+               private int val;
+
+               public SomeVeryLongValue() {}
+               
+               public SomeVeryLongValue(int val) {
+                       this.val = val;
+               }
+               
+               public int val() {
+                       return val;
+               }
+               
+               @Override
+               public void read(DataInputView in) throws IOException {
+                       val = in.readInt();
+                       for (int i = 0; i < BUFFER.length; i++) {
+                               byte b = in.readByte();
+                               assertEquals(BUFFER[i], b);
+                       }
+               }
+               
+               @Override
+               public void write(DataOutputView out) throws IOException {
+                       out.writeInt(val);
+                       out.write(BUFFER);
+               }
+       }
+       
+       
+//     @Test
+       public void fileTest() {
+               
+               final IOManager ioMan = new IOManagerAsync();
+               final int PAGE_SIZE = 4 * 1024;
+               final int NUM_PAGES = 4;
+               final int NUM_RECORDS = 10;
+               
+               FileIOChannel.ID channel = null;
+               
+               try {
+                       final DefaultMemoryManager memMan = new 
DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE);
+                       final AbstractInvokable owner = new DummyInvokable();
+                       
+                       final List<MemorySegment> memory = 
memMan.allocatePages(owner, NUM_PAGES);
+
+                       final TypeInformation<?>[] types = new 
TypeInformation<?>[] {
+                                       BasicTypeInfo.LONG_TYPE_INFO,
+                                       new 
ValueTypeInfo<SomeVeryLongValue>(SomeVeryLongValue.class),
+                                       BasicTypeInfo.BYTE_TYPE_INFO
+                               };
+                       
+                       final TupleTypeInfo<Tuple3<Long, SomeVeryLongValue, 
Byte>> typeInfo = 
+                                                               new 
TupleTypeInfo<Tuple3<Long,SomeVeryLongValue,Byte>>(types);
+
+                       final TypeSerializer<Tuple3<Long, SomeVeryLongValue, 
Byte>> serializer = typeInfo.createSerializer();
+
+                       
+                       channel = ioMan.createChannel();
+                       FileChannelOutputView out = new FileChannelOutputView(
+                                       
ioMan.createBlockChannelWriter(channel), memMan, memory, PAGE_SIZE);
+                       
+                       // add the test data
+                       Random rnd = new Random();
+                       List<Long> offsets = new ArrayList<Long>();
+                       
+                       for (int i = 0; i < NUM_RECORDS; i++) {
+                               offsets.add(out.getWriteOffset());
+                               long val = rnd.nextLong();
+                               Tuple3<Long, SomeVeryLongValue, Byte> next = 
new Tuple3<Long, SomeVeryLongValue, Byte>(val, new SomeVeryLongValue((int) 
val), (byte) val);
+                               serializer.serialize(next, out);
+                       }
+                       
+                       out.close();
+                       
+                       for (int i = 1; i < offsets.size(); i++) {
+                               assertTrue(offsets.get(i) > offsets.get(i-1));
+                       }
+
+                       memMan.allocatePages(owner, memory, NUM_PAGES);
+                       
+                       SeekableFileChannelInputView in = new 
SeekableFileChannelInputView(ioMan, 
+                                       channel, memMan, memory, 
out.getBytesInLatestSegment());
+                       
+                       for (int i = 0; i < NUM_RECORDS; i++) {
+                               System.out.println(i);
+                               in.seek(offsets.get(i));
+                               Tuple3<Long, SomeVeryLongValue, Byte> next = 
serializer.deserialize(in);
+                               
+                               // key and value must be equal
+                               assertTrue(next.f0.intValue() == next.f1.val());
+                               assertTrue(next.f0.byteValue() == next.f2);
+                       }
+                       
+                       in.closeAndDelete();
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+               finally {
+                       if (channel != null) {
+                               try {
+                                       ioMan.deleteChannel(channel);
+                               } catch (IOException e) {}
+                       }
+                       
+                       if (ioMan != null) {
+                               ioMan.shutdown();
+                       }
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/48256560/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java
new file mode 100644
index 0000000..d2abd62
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import static org.junit.Assert.*;
+
+import java.util.List;
+import java.util.Random;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.Test;
+
+public class LargeRecordHandlerTest {
+
+       @Test
+       public void testEmptyRecordHandler() {
+               
+               final IOManager ioMan = new IOManagerAsync();
+               final int PAGE_SIZE = 4 * 1024;
+               final int NUM_PAGES = 50;
+               
+               try {
+                       final DefaultMemoryManager memMan = new 
DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE);
+                       final AbstractInvokable owner = new DummyInvokable();
+                       final List<MemorySegment> memory = 
memMan.allocatePages(owner, NUM_PAGES);
+                       
+                       final TupleTypeInfo<Tuple2<Long, String>> typeInfo = 
(TupleTypeInfo<Tuple2<Long, String>>) 
+                                       TypeInfoParser.<Tuple2<Long, 
String>>parse("Tuple2<Long, String>");
+
+                       final TypeSerializer<Tuple2<Long, String>> serializer = 
typeInfo.createSerializer();
+                       final TypeComparator<Tuple2<Long, String>> comparator = 
typeInfo.createComparator(
+                                       new int[] {0}, new boolean[] {true}, 0);
+                       
+                       LargeRecordHandler<Tuple2<Long, String>> handler = new 
LargeRecordHandler<Tuple2<Long, String>>(
+                                       serializer, comparator, ioMan, memMan, 
memory, owner, 128);
+                       
+                       assertFalse(handler.hasData());
+                       
+                       handler.close();
+                       
+                       assertFalse(handler.hasData());
+                       
+                       handler.close();
+                       
+                       try {
+                               handler.addRecord(new Tuple2<Long, String>(92L, 
"peter pepper"));
+                               fail("should throw an exception");
+                       }
+                       catch (IllegalStateException e) {
+                               // expected
+                       }
+                       
+                       assertTrue(memMan.verifyEmpty());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+               finally {
+                       if (ioMan != null) {
+                               ioMan.shutdown();
+                       }
+               }
+       }
+       
+       @Test
+       public void testRecordHandlerSingleKey() {
+               
+               final IOManager ioMan = new IOManagerAsync();
+               final int PAGE_SIZE = 4 * 1024;
+               final int NUM_PAGES = 24;
+               final int NUM_RECORDS = 25000;
+               
+               try {
+                       final DefaultMemoryManager memMan = new 
DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE);
+                       final AbstractInvokable owner = new DummyInvokable();
+                       
+                       final List<MemorySegment> initialMemory = 
memMan.allocatePages(owner, 6);
+                       final List<MemorySegment> sortMemory = 
memMan.allocatePages(owner, NUM_PAGES - 6);
+                       
+                       final TupleTypeInfo<Tuple2<Long, String>> typeInfo = 
(TupleTypeInfo<Tuple2<Long, String>>) 
+                                       TypeInfoParser.<Tuple2<Long, 
String>>parse("Tuple2<Long, String>");
+
+                       final TypeSerializer<Tuple2<Long, String>> serializer = 
typeInfo.createSerializer();
+                       final TypeComparator<Tuple2<Long, String>> comparator = 
typeInfo.createComparator(
+                                       new int[] {0}, new boolean[] {true}, 0);
+                       
+                       LargeRecordHandler<Tuple2<Long, String>> handler = new 
LargeRecordHandler<Tuple2<Long, String>>(
+                                       serializer, comparator, ioMan, memMan, 
initialMemory, owner, 128);
+                       
+                       assertFalse(handler.hasData());
+                       
+                       
+                       // add the test data
+                       Random rnd = new Random();
+                       
+                       for (int i = 0; i < NUM_RECORDS; i++) {
+                               long val = rnd.nextLong();
+                               handler.addRecord(new Tuple2<Long, String>(val, 
String.valueOf(val)));
+                               assertTrue(handler.hasData());
+                       }
+                       
+                       MutableObjectIterator<Tuple2<Long, String>> sorted = 
handler.finishWriteAndSortKeys(sortMemory);
+                       
+                       try {
+                               handler.addRecord(new Tuple2<Long, String>(92L, 
"peter pepper"));
+                               fail("should throw an exception");
+                       }
+                       catch (IllegalStateException e) {
+                               // expected
+                       }
+                       
+                       Tuple2<Long, String> previous = null;
+                       Tuple2<Long, String> next;
+                       
+                       while ((next = sorted.next(null)) != null) {
+                               // key and value must be equal
+                               
assertTrue(next.f0.equals(Long.parseLong(next.f1)));
+                               
+                               // order must be correct
+                               if (previous != null) {
+                                       assertTrue(previous.f0 <= next.f0);
+                               }
+                               previous = next;
+                       }
+                       
+                       handler.close();
+                       
+                       assertFalse(handler.hasData());
+                       
+                       handler.close();
+                       
+                       try {
+                               handler.addRecord(new Tuple2<Long, String>(92L, 
"peter pepper"));
+                               fail("should throw an exception");
+                       }
+                       catch (IllegalStateException e) {
+                               // expected
+                       }
+                       
+                       assertTrue(memMan.verifyEmpty());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+               finally {
+                       if (ioMan != null) {
+                               ioMan.shutdown();
+                       }
+               }
+       }
+       
+       @Test
+       public void testRecordHandlerCompositeKey() {
+               
+               final IOManager ioMan = new IOManagerAsync();
+               final int PAGE_SIZE = 4 * 1024;
+               final int NUM_PAGES = 24;
+               final int NUM_RECORDS = 25000;
+               
+               try {
+                       final DefaultMemoryManager memMan = new 
DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE);
+                       final AbstractInvokable owner = new DummyInvokable();
+                       
+                       final List<MemorySegment> initialMemory = 
memMan.allocatePages(owner, 6);
+                       final List<MemorySegment> sortMemory = 
memMan.allocatePages(owner, NUM_PAGES - 6);
+                       
+                       final TupleTypeInfo<Tuple3<Long, String, Byte>> 
typeInfo = (TupleTypeInfo<Tuple3<Long, String, Byte>>) 
+                                       TypeInfoParser.<Tuple3<Long, String, 
Byte>>parse("Tuple3<Long, String, Byte>");
+
+                       final TypeSerializer<Tuple3<Long, String, Byte>> 
serializer = typeInfo.createSerializer();
+                       final TypeComparator<Tuple3<Long, String, Byte>> 
comparator = typeInfo.createComparator(
+                                       new int[] {2, 0}, new boolean[] {true, 
true}, 0);
+                       
+                       LargeRecordHandler<Tuple3<Long, String, Byte>> handler 
= new LargeRecordHandler<Tuple3<Long, String, Byte>>(
+                                       serializer, comparator, ioMan, memMan, 
initialMemory, owner, 128);
+                       
+                       assertFalse(handler.hasData());
+                       
+                       
+                       // add the test data
+                       Random rnd = new Random();
+                       
+                       for (int i = 0; i < NUM_RECORDS; i++) {
+                               long val = rnd.nextLong();
+                               handler.addRecord(new Tuple3<Long, String, 
Byte>(val, String.valueOf(val), (byte) val));
+                               assertTrue(handler.hasData());
+                       }
+                       
+                       MutableObjectIterator<Tuple3<Long, String, Byte>> 
sorted = handler.finishWriteAndSortKeys(sortMemory);
+                       
+                       try {
+                               handler.addRecord(new Tuple3<Long, String, 
Byte>(92L, "peter pepper", (byte) 1));
+                               fail("should throw an exception");
+                       }
+                       catch (IllegalStateException e) {
+                               // expected
+                       }
+                       
+                       Tuple3<Long, String, Byte> previous = null;
+                       Tuple3<Long, String, Byte> next;
+                       
+                       while ((next = sorted.next(null)) != null) {
+                               // key and value must be equal
+                               
assertTrue(next.f0.equals(Long.parseLong(next.f1)));
+                               assertTrue(next.f0.byteValue() == next.f2);
+                               
+                               // order must be correct
+                               if (previous != null) {
+                                       assertTrue(previous.f2 <= next.f2);
+                                       assertTrue(previous.f2.byteValue() != 
next.f2.byteValue() || previous.f0 <= next.f0);
+                               }
+                               previous = next;
+                       }
+                       
+                       handler.close();
+                       
+                       assertFalse(handler.hasData());
+                       
+                       handler.close();
+                       
+                       try {
+                               handler.addRecord(new Tuple3<Long, String, 
Byte>(92L, "peter pepper", (byte) 1));
+                               fail("should throw an exception");
+                       }
+                       catch (IllegalStateException e) {
+                               // expected
+                       }
+                       
+                       assertTrue(memMan.verifyEmpty());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+               finally {
+                       if (ioMan != null) {
+                               ioMan.shutdown();
+                       }
+               }
+       }
+}

Reply via email to