http://git-wip-us.apache.org/repos/asf/flink/blob/48256560/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java index a340ef2..914359d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java @@ -16,18 +16,31 @@ * limitations under the License. */ - package org.apache.flink.runtime.operators.sort; -import java.util.Comparator; +import static org.junit.Assert.*; import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Comparator; +import java.util.Random; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerFactory; import org.apache.flink.api.common.typeutils.record.RecordComparator; import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.ValueTypeInfo; +import org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; @@ -348,4 +361,214 @@ public class ExternalSortITCase { Assert.assertEquals("Not all pairs were read back in.", PAIRS, pairsRead); merger.close(); } + + @Test + public void testSortWithLongRecordsOnly() { + try { + final int NUM_RECORDS = 10; + + final TypeInformation<?>[] types = new TypeInformation<?>[] { + BasicTypeInfo.LONG_TYPE_INFO, + new ValueTypeInfo<SomeMaybeLongValue>(SomeMaybeLongValue.class) + }; + + final TupleTypeInfo<Tuple2<Long, SomeMaybeLongValue>> typeInfo = + new TupleTypeInfo<Tuple2<Long,SomeMaybeLongValue>>(types); + final TypeSerializer<Tuple2<Long, SomeMaybeLongValue>> serializer = typeInfo.createSerializer(); + final TypeComparator<Tuple2<Long, SomeMaybeLongValue>> comparator = typeInfo.createComparator(new int[] {0}, new boolean[]{false}, 0); + + MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> source = + new MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>>() + { + private final Random rnd = new Random(); + private int num = 0; + + @Override + public Tuple2<Long, SomeMaybeLongValue> next(Tuple2<Long, SomeMaybeLongValue> reuse) { + if (num++ < NUM_RECORDS) { + long val = rnd.nextLong(); + return new Tuple2<Long, SomeMaybeLongValue>(val, new SomeMaybeLongValue((int) val)); + } + else { + return null; + } + + } + }; + + @SuppressWarnings("unchecked") + Sorter<Tuple2<Long, SomeMaybeLongValue>> sorter = new UnilateralSortMerger<Tuple2<Long, SomeMaybeLongValue>>( + this.memoryManager, this.ioManager, + source, this.parentTask, + new RuntimeStatefulSerializerFactory<Tuple2<Long, SomeMaybeLongValue>>(serializer, (Class<Tuple2<Long, SomeMaybeLongValue>>) (Class<?>) Tuple2.class), + comparator, 1.0, 1, 128, 0.7f); + + // check order + MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> iterator = sorter.getIterator(); + + Tuple2<Long, SomeMaybeLongValue> val = serializer.createInstance(); + + long prevKey = Long.MAX_VALUE; + + for (int i = 0; i < NUM_RECORDS; i++) { + val = iterator.next(val); + + assertTrue(val.f0 <= prevKey); + assertTrue(val.f0.intValue() == val.f1.val()); + } + + assertNull(iterator.next(val)); + + sorter.close(); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testSortWithLongAndShortRecordsMixed() { + try { + final int NUM_RECORDS = 1000000; + final int LARGE_REC_INTERVAL = 100000; + + final TypeInformation<?>[] types = new TypeInformation<?>[] { + BasicTypeInfo.LONG_TYPE_INFO, + new ValueTypeInfo<SomeMaybeLongValue>(SomeMaybeLongValue.class) + }; + + final TupleTypeInfo<Tuple2<Long, SomeMaybeLongValue>> typeInfo = + new TupleTypeInfo<Tuple2<Long,SomeMaybeLongValue>>(types); + final TypeSerializer<Tuple2<Long, SomeMaybeLongValue>> serializer = typeInfo.createSerializer(); + final TypeComparator<Tuple2<Long, SomeMaybeLongValue>> comparator = typeInfo.createComparator(new int[] {0}, new boolean[]{false}, 0); + + MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> source = + new MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>>() + { + private final Random rnd = new Random(); + private int num = -1; + + @Override + public Tuple2<Long, SomeMaybeLongValue> next(Tuple2<Long, SomeMaybeLongValue> reuse) { + if (++num < NUM_RECORDS) { + long val = rnd.nextLong(); + return new Tuple2<Long, SomeMaybeLongValue>(val, new SomeMaybeLongValue((int) val, num % LARGE_REC_INTERVAL == 0)); + } + else { + return null; + } + + } + }; + + @SuppressWarnings("unchecked") + Sorter<Tuple2<Long, SomeMaybeLongValue>> sorter = new UnilateralSortMerger<Tuple2<Long, SomeMaybeLongValue>>( + this.memoryManager, this.ioManager, + source, this.parentTask, + new RuntimeStatefulSerializerFactory<Tuple2<Long, SomeMaybeLongValue>>(serializer, (Class<Tuple2<Long, SomeMaybeLongValue>>) (Class<?>) Tuple2.class), + comparator, 1.0, 1, 128, 0.7f); + + // check order + MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> iterator = sorter.getIterator(); + + Tuple2<Long, SomeMaybeLongValue> val = serializer.createInstance(); + + long prevKey = Long.MAX_VALUE; + + for (int i = 0; i < NUM_RECORDS; i++) { + val = iterator.next(val); + + assertTrue(val.f0 <= prevKey); + assertTrue(val.f0.intValue() == val.f1.val()); + } + + assertNull(iterator.next(val)); + + sorter.close(); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + // -------------------------------------------------------------------------------------------- + + public static final class SomeMaybeLongValue implements org.apache.flink.types.Value { + + private static final long serialVersionUID = 1L; + + private static final byte[] BUFFER = new byte[100000000]; + + static { + for (int i = 0; i < BUFFER.length; i++) { + BUFFER[i] = (byte) i; + } + } + + private int val; + + private boolean isLong; + + + public SomeMaybeLongValue() { + this.isLong = true; + } + + public SomeMaybeLongValue(int val) { + this.val = val; + this.isLong = true; + } + + public SomeMaybeLongValue(int val, boolean isLong) { + this.val = val; + this.isLong = isLong; + } + + public int val() { + return val; + } + + public boolean isLong() { + return isLong; + } + + @Override + public void read(DataInputView in) throws IOException { + val = in.readInt(); + isLong = in.readBoolean(); + + if (isLong) { + for (int i = 0; i < BUFFER.length; i++) { + byte b = in.readByte(); + assertEquals(BUFFER[i], b); + } + } + } + + @Override + public void write(DataOutputView out) throws IOException { + out.writeInt(val); + out.writeBoolean(isLong); + if (isLong) { + out.write(BUFFER); + } + } + + @Override + public int hashCode() { + return val; + } + + @Override + public boolean equals(Object obj) { + return (obj instanceof SomeMaybeLongValue) && ((SomeMaybeLongValue) obj).val == this.val; + } + + @Override + public String toString() { + return isLong ? "Large Value" : "Small Value"; + } + } }
http://git-wip-us.apache.org/repos/asf/flink/blob/48256560/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java new file mode 100644 index 0000000..3d237f7 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.ValueTypeInfo; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.FileChannelOutputView; +import org.apache.flink.runtime.io.disk.SeekableFileChannelInputView; +import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memorymanager.DefaultMemoryManager; +import org.apache.flink.runtime.operators.testutils.DummyInvokable; +import org.apache.flink.types.Value; +import org.apache.flink.util.MutableObjectIterator; +import org.junit.Test; + +public class LargeRecordHandlerITCase { + + @Test + public void testRecordHandlerCompositeKey() { + + final IOManager ioMan = new IOManagerAsync(); + final int PAGE_SIZE = 4 * 1024; + final int NUM_PAGES = 1000; + final int NUM_RECORDS = 10; + + try { + final DefaultMemoryManager memMan = new DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE); + final AbstractInvokable owner = new DummyInvokable(); + + final List<MemorySegment> initialMemory = memMan.allocatePages(owner, 6); + final List<MemorySegment> sortMemory = memMan.allocatePages(owner, NUM_PAGES - 6); + + final TypeInformation<?>[] types = new TypeInformation<?>[] { + BasicTypeInfo.LONG_TYPE_INFO, + new ValueTypeInfo<SomeVeryLongValue>(SomeVeryLongValue.class), + BasicTypeInfo.BYTE_TYPE_INFO + }; + + final TupleTypeInfo<Tuple3<Long, SomeVeryLongValue, Byte>> typeInfo = + new TupleTypeInfo<Tuple3<Long,SomeVeryLongValue,Byte>>(types); + + final TypeSerializer<Tuple3<Long, SomeVeryLongValue, Byte>> serializer = typeInfo.createSerializer(); + final TypeComparator<Tuple3<Long, SomeVeryLongValue, Byte>> comparator = typeInfo.createComparator( + new int[] {2, 0}, new boolean[] {true, true}, 0); + + LargeRecordHandler<Tuple3<Long, SomeVeryLongValue, Byte>> handler = new LargeRecordHandler<Tuple3<Long, SomeVeryLongValue, Byte>>( + serializer, comparator, ioMan, memMan, initialMemory, owner, 128); + + assertFalse(handler.hasData()); + + + // add the test data + Random rnd = new Random(); + + for (int i = 0; i < NUM_RECORDS; i++) { + long val = rnd.nextLong(); + handler.addRecord(new Tuple3<Long, SomeVeryLongValue, Byte>(val, new SomeVeryLongValue((int) val), (byte) val)); + assertTrue(handler.hasData()); + } + + MutableObjectIterator<Tuple3<Long, SomeVeryLongValue, Byte>> sorted = handler.finishWriteAndSortKeys(sortMemory); + + try { + handler.addRecord(new Tuple3<Long, SomeVeryLongValue, Byte>(92L, null, (byte) 1)); + fail("should throw an exception"); + } + catch (IllegalStateException e) { + // expected + } + + Tuple3<Long, SomeVeryLongValue, Byte> previous = null; + Tuple3<Long, SomeVeryLongValue, Byte> next; + + while ((next = sorted.next(null)) != null) { + // key and value must be equal + assertTrue(next.f0.intValue() == next.f1.val()); + assertTrue(next.f0.byteValue() == next.f2); + + // order must be correct + if (previous != null) { + assertTrue(previous.f2 <= next.f2); + assertTrue(previous.f2.byteValue() != next.f2.byteValue() || previous.f0 <= next.f0); + } + previous = next; + } + + handler.close(); + + assertFalse(handler.hasData()); + + handler.close(); + + try { + handler.addRecord(new Tuple3<Long, SomeVeryLongValue, Byte>(92L, null, (byte) 1)); + fail("should throw an exception"); + } + catch (IllegalStateException e) { + // expected + } + + assertTrue(memMan.verifyEmpty()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + if (ioMan != null) { + ioMan.shutdown(); + } + } + } + + public static final class SomeVeryLongValue implements Value { + + private static final long serialVersionUID = 1L; + + private static final byte[] BUFFER = new byte[50000000]; + +// private static final byte[] BUFFER = new byte[500000000]; + + static { + for (int i = 0; i < BUFFER.length; i++) { + BUFFER[i] = (byte) i; + } + } + + private int val; + + public SomeVeryLongValue() {} + + public SomeVeryLongValue(int val) { + this.val = val; + } + + public int val() { + return val; + } + + @Override + public void read(DataInputView in) throws IOException { + val = in.readInt(); + for (int i = 0; i < BUFFER.length; i++) { + byte b = in.readByte(); + assertEquals(BUFFER[i], b); + } + } + + @Override + public void write(DataOutputView out) throws IOException { + out.writeInt(val); + out.write(BUFFER); + } + } + + +// @Test + public void fileTest() { + + final IOManager ioMan = new IOManagerAsync(); + final int PAGE_SIZE = 4 * 1024; + final int NUM_PAGES = 4; + final int NUM_RECORDS = 10; + + FileIOChannel.ID channel = null; + + try { + final DefaultMemoryManager memMan = new DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE); + final AbstractInvokable owner = new DummyInvokable(); + + final List<MemorySegment> memory = memMan.allocatePages(owner, NUM_PAGES); + + final TypeInformation<?>[] types = new TypeInformation<?>[] { + BasicTypeInfo.LONG_TYPE_INFO, + new ValueTypeInfo<SomeVeryLongValue>(SomeVeryLongValue.class), + BasicTypeInfo.BYTE_TYPE_INFO + }; + + final TupleTypeInfo<Tuple3<Long, SomeVeryLongValue, Byte>> typeInfo = + new TupleTypeInfo<Tuple3<Long,SomeVeryLongValue,Byte>>(types); + + final TypeSerializer<Tuple3<Long, SomeVeryLongValue, Byte>> serializer = typeInfo.createSerializer(); + + + channel = ioMan.createChannel(); + FileChannelOutputView out = new FileChannelOutputView( + ioMan.createBlockChannelWriter(channel), memMan, memory, PAGE_SIZE); + + // add the test data + Random rnd = new Random(); + List<Long> offsets = new ArrayList<Long>(); + + for (int i = 0; i < NUM_RECORDS; i++) { + offsets.add(out.getWriteOffset()); + long val = rnd.nextLong(); + Tuple3<Long, SomeVeryLongValue, Byte> next = new Tuple3<Long, SomeVeryLongValue, Byte>(val, new SomeVeryLongValue((int) val), (byte) val); + serializer.serialize(next, out); + } + + out.close(); + + for (int i = 1; i < offsets.size(); i++) { + assertTrue(offsets.get(i) > offsets.get(i-1)); + } + + memMan.allocatePages(owner, memory, NUM_PAGES); + + SeekableFileChannelInputView in = new SeekableFileChannelInputView(ioMan, + channel, memMan, memory, out.getBytesInLatestSegment()); + + for (int i = 0; i < NUM_RECORDS; i++) { + System.out.println(i); + in.seek(offsets.get(i)); + Tuple3<Long, SomeVeryLongValue, Byte> next = serializer.deserialize(in); + + // key and value must be equal + assertTrue(next.f0.intValue() == next.f1.val()); + assertTrue(next.f0.byteValue() == next.f2); + } + + in.closeAndDelete(); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + if (channel != null) { + try { + ioMan.deleteChannel(channel); + } catch (IOException e) {} + } + + if (ioMan != null) { + ioMan.shutdown(); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/48256560/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java new file mode 100644 index 0000000..d2abd62 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerTest.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import static org.junit.Assert.*; + +import java.util.List; +import java.util.Random; + +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeInfoParser; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memorymanager.DefaultMemoryManager; +import org.apache.flink.runtime.operators.testutils.DummyInvokable; +import org.apache.flink.util.MutableObjectIterator; +import org.junit.Test; + +public class LargeRecordHandlerTest { + + @Test + public void testEmptyRecordHandler() { + + final IOManager ioMan = new IOManagerAsync(); + final int PAGE_SIZE = 4 * 1024; + final int NUM_PAGES = 50; + + try { + final DefaultMemoryManager memMan = new DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE); + final AbstractInvokable owner = new DummyInvokable(); + final List<MemorySegment> memory = memMan.allocatePages(owner, NUM_PAGES); + + final TupleTypeInfo<Tuple2<Long, String>> typeInfo = (TupleTypeInfo<Tuple2<Long, String>>) + TypeInfoParser.<Tuple2<Long, String>>parse("Tuple2<Long, String>"); + + final TypeSerializer<Tuple2<Long, String>> serializer = typeInfo.createSerializer(); + final TypeComparator<Tuple2<Long, String>> comparator = typeInfo.createComparator( + new int[] {0}, new boolean[] {true}, 0); + + LargeRecordHandler<Tuple2<Long, String>> handler = new LargeRecordHandler<Tuple2<Long, String>>( + serializer, comparator, ioMan, memMan, memory, owner, 128); + + assertFalse(handler.hasData()); + + handler.close(); + + assertFalse(handler.hasData()); + + handler.close(); + + try { + handler.addRecord(new Tuple2<Long, String>(92L, "peter pepper")); + fail("should throw an exception"); + } + catch (IllegalStateException e) { + // expected + } + + assertTrue(memMan.verifyEmpty()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + if (ioMan != null) { + ioMan.shutdown(); + } + } + } + + @Test + public void testRecordHandlerSingleKey() { + + final IOManager ioMan = new IOManagerAsync(); + final int PAGE_SIZE = 4 * 1024; + final int NUM_PAGES = 24; + final int NUM_RECORDS = 25000; + + try { + final DefaultMemoryManager memMan = new DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE); + final AbstractInvokable owner = new DummyInvokable(); + + final List<MemorySegment> initialMemory = memMan.allocatePages(owner, 6); + final List<MemorySegment> sortMemory = memMan.allocatePages(owner, NUM_PAGES - 6); + + final TupleTypeInfo<Tuple2<Long, String>> typeInfo = (TupleTypeInfo<Tuple2<Long, String>>) + TypeInfoParser.<Tuple2<Long, String>>parse("Tuple2<Long, String>"); + + final TypeSerializer<Tuple2<Long, String>> serializer = typeInfo.createSerializer(); + final TypeComparator<Tuple2<Long, String>> comparator = typeInfo.createComparator( + new int[] {0}, new boolean[] {true}, 0); + + LargeRecordHandler<Tuple2<Long, String>> handler = new LargeRecordHandler<Tuple2<Long, String>>( + serializer, comparator, ioMan, memMan, initialMemory, owner, 128); + + assertFalse(handler.hasData()); + + + // add the test data + Random rnd = new Random(); + + for (int i = 0; i < NUM_RECORDS; i++) { + long val = rnd.nextLong(); + handler.addRecord(new Tuple2<Long, String>(val, String.valueOf(val))); + assertTrue(handler.hasData()); + } + + MutableObjectIterator<Tuple2<Long, String>> sorted = handler.finishWriteAndSortKeys(sortMemory); + + try { + handler.addRecord(new Tuple2<Long, String>(92L, "peter pepper")); + fail("should throw an exception"); + } + catch (IllegalStateException e) { + // expected + } + + Tuple2<Long, String> previous = null; + Tuple2<Long, String> next; + + while ((next = sorted.next(null)) != null) { + // key and value must be equal + assertTrue(next.f0.equals(Long.parseLong(next.f1))); + + // order must be correct + if (previous != null) { + assertTrue(previous.f0 <= next.f0); + } + previous = next; + } + + handler.close(); + + assertFalse(handler.hasData()); + + handler.close(); + + try { + handler.addRecord(new Tuple2<Long, String>(92L, "peter pepper")); + fail("should throw an exception"); + } + catch (IllegalStateException e) { + // expected + } + + assertTrue(memMan.verifyEmpty()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + if (ioMan != null) { + ioMan.shutdown(); + } + } + } + + @Test + public void testRecordHandlerCompositeKey() { + + final IOManager ioMan = new IOManagerAsync(); + final int PAGE_SIZE = 4 * 1024; + final int NUM_PAGES = 24; + final int NUM_RECORDS = 25000; + + try { + final DefaultMemoryManager memMan = new DefaultMemoryManager(NUM_PAGES * PAGE_SIZE, 1, PAGE_SIZE); + final AbstractInvokable owner = new DummyInvokable(); + + final List<MemorySegment> initialMemory = memMan.allocatePages(owner, 6); + final List<MemorySegment> sortMemory = memMan.allocatePages(owner, NUM_PAGES - 6); + + final TupleTypeInfo<Tuple3<Long, String, Byte>> typeInfo = (TupleTypeInfo<Tuple3<Long, String, Byte>>) + TypeInfoParser.<Tuple3<Long, String, Byte>>parse("Tuple3<Long, String, Byte>"); + + final TypeSerializer<Tuple3<Long, String, Byte>> serializer = typeInfo.createSerializer(); + final TypeComparator<Tuple3<Long, String, Byte>> comparator = typeInfo.createComparator( + new int[] {2, 0}, new boolean[] {true, true}, 0); + + LargeRecordHandler<Tuple3<Long, String, Byte>> handler = new LargeRecordHandler<Tuple3<Long, String, Byte>>( + serializer, comparator, ioMan, memMan, initialMemory, owner, 128); + + assertFalse(handler.hasData()); + + + // add the test data + Random rnd = new Random(); + + for (int i = 0; i < NUM_RECORDS; i++) { + long val = rnd.nextLong(); + handler.addRecord(new Tuple3<Long, String, Byte>(val, String.valueOf(val), (byte) val)); + assertTrue(handler.hasData()); + } + + MutableObjectIterator<Tuple3<Long, String, Byte>> sorted = handler.finishWriteAndSortKeys(sortMemory); + + try { + handler.addRecord(new Tuple3<Long, String, Byte>(92L, "peter pepper", (byte) 1)); + fail("should throw an exception"); + } + catch (IllegalStateException e) { + // expected + } + + Tuple3<Long, String, Byte> previous = null; + Tuple3<Long, String, Byte> next; + + while ((next = sorted.next(null)) != null) { + // key and value must be equal + assertTrue(next.f0.equals(Long.parseLong(next.f1))); + assertTrue(next.f0.byteValue() == next.f2); + + // order must be correct + if (previous != null) { + assertTrue(previous.f2 <= next.f2); + assertTrue(previous.f2.byteValue() != next.f2.byteValue() || previous.f0 <= next.f0); + } + previous = next; + } + + handler.close(); + + assertFalse(handler.hasData()); + + handler.close(); + + try { + handler.addRecord(new Tuple3<Long, String, Byte>(92L, "peter pepper", (byte) 1)); + fail("should throw an exception"); + } + catch (IllegalStateException e) { + // expected + } + + assertTrue(memMan.verifyEmpty()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + if (ioMan != null) { + ioMan.shutdown(); + } + } + } +}
