[FLINK-2754] Fixed FixedLengthRecordSorter write to multi memory pages issue and add more unit tests.
This closes #1178 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/68912126 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/68912126 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/68912126 Branch: refs/heads/master Commit: 68912126d73b92a07d15ec3f21f9ac922744fb45 Parents: e727355 Author: chengxiang li <[email protected]> Authored: Thu Sep 24 11:20:10 2015 +0800 Committer: Stephan Ewen <[email protected]> Committed: Tue Sep 29 12:18:49 2015 +0200 ---------------------------------------------------------------------- .../operators/sort/FixedLengthRecordSorter.java | 4 +- .../sort/FixedLengthRecordSorterTest.java | 109 +++++++++++++++++++ 2 files changed, 112 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/68912126/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java index da96b17..3a44ab5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java @@ -447,11 +447,13 @@ public final class FixedLengthRecordSorter<T> implements InMemorySorter<T> { num -= recordsPerSegment; } else { // partially filled segment - for (; num > 0; num--) { + for (; num > 0 && offset <= this.lastEntryOffset; num--, offset += this.recordSize) { record = comparator.readWithKeyDenormalization(record, inView); serializer.serialize(record, output); } } + + offset = 0; } } http://git-wip-us.apache.org/repos/asf/flink/blob/68912126/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java index 517bec3..288e86d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java @@ -25,6 +25,14 @@ import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemoryType; +import org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator; +import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader; +import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter; +import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView; +import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView; +import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.operators.testutils.DummyInvokable; import org.apache.flink.runtime.operators.testutils.RandomIntPairGenerator; @@ -48,6 +56,8 @@ public class FixedLengthRecordSorterTest { private static final int MEMORY_PAGE_SIZE = 32 * 1024; private MemoryManager memoryManager; + + private IOManager ioManager; private TypeSerializer<IntPair> serializer; @@ -57,6 +67,7 @@ public class FixedLengthRecordSorterTest { @Before public void beforeTest() { this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, MEMORY_PAGE_SIZE, MemoryType.HEAP, true); + this.ioManager = new IOManagerAsync(); this.serializer = new IntPairSerializer(); this.comparator = new IntPairComparator(); } @@ -368,4 +379,102 @@ public class FixedLengthRecordSorterTest { sorter.dispose(); this.memoryManager.release(memory); } + + @Test + public void testFlushFullMemoryPage() throws Exception { + // Insert IntPair which would fill 2 memory pages. + final int NUM_RECORDS = 2 * MEMORY_PAGE_SIZE / 8; + final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), 3); + + FixedLengthRecordSorter<IntPair> sorter = newSortBuffer(memory); + UniformIntPairGenerator generator = new UniformIntPairGenerator(Integer.MAX_VALUE, 1, false); + + // write the records + IntPair record = new IntPair(); + int num = -1; + do { + generator.next(record); + num++; + } + while (sorter.write(record) && num < NUM_RECORDS); + + FileIOChannel.ID channelID = this.ioManager.createChannelEnumerator().next(); + BlockChannelWriter<MemorySegment> blockChannelWriter = this.ioManager.createBlockChannelWriter(channelID); + final List<MemorySegment> writeBuffer = this.memoryManager.allocatePages(new DummyInvokable(), 3); + ChannelWriterOutputView outputView = new ChannelWriterOutputView(blockChannelWriter, writeBuffer, writeBuffer.get(0).size()); + + sorter.writeToOutput(outputView, 0, NUM_RECORDS); + + this.memoryManager.release(outputView.close()); + + BlockChannelReader<MemorySegment> blockChannelReader = this.ioManager.createBlockChannelReader(channelID); + final List<MemorySegment> readBuffer = this.memoryManager.allocatePages(new DummyInvokable(), 3); + ChannelReaderInputView readerInputView = new ChannelReaderInputView(blockChannelReader, readBuffer, false); + final List<MemorySegment> dataBuffer = this.memoryManager.allocatePages(new DummyInvokable(), 3); + ChannelReaderInputViewIterator<IntPair> iterator = new ChannelReaderInputViewIterator(readerInputView, dataBuffer, this.serializer); + + record = iterator.next(record); + int i =0; + while (record != null) { + Assert.assertEquals(i, record.getKey()); + record = iterator.next(record); + i++; + } + + Assert.assertEquals(NUM_RECORDS, i); + + this.memoryManager.release(dataBuffer); + // release the memory occupied by the buffers + sorter.dispose(); + this.memoryManager.release(memory); + } + + @Test + public void testFlushPartialMemoryPage() throws Exception { + // Insert IntPair which would fill 2 memory pages. + final int NUM_RECORDS = 2 * MEMORY_PAGE_SIZE / 8; + final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), 3); + + FixedLengthRecordSorter<IntPair> sorter = newSortBuffer(memory); + UniformIntPairGenerator generator = new UniformIntPairGenerator(Integer.MAX_VALUE, 1, false); + + // write the records + IntPair record = new IntPair(); + int num = -1; + do { + generator.next(record); + num++; + } + while (sorter.write(record) && num < NUM_RECORDS); + + FileIOChannel.ID channelID = this.ioManager.createChannelEnumerator().next(); + BlockChannelWriter<MemorySegment> blockChannelWriter = this.ioManager.createBlockChannelWriter(channelID); + final List<MemorySegment> writeBuffer = this.memoryManager.allocatePages(new DummyInvokable(), 3); + ChannelWriterOutputView outputView = new ChannelWriterOutputView(blockChannelWriter, writeBuffer, writeBuffer.get(0).size()); + + sorter.writeToOutput(outputView, 1, NUM_RECORDS - 1); + + this.memoryManager.release(outputView.close()); + + BlockChannelReader<MemorySegment> blockChannelReader = this.ioManager.createBlockChannelReader(channelID); + final List<MemorySegment> readBuffer = this.memoryManager.allocatePages(new DummyInvokable(), 3); + ChannelReaderInputView readerInputView = new ChannelReaderInputView(blockChannelReader, readBuffer, false); + final List<MemorySegment> dataBuffer = this.memoryManager.allocatePages(new DummyInvokable(), 3); + ChannelReaderInputViewIterator<IntPair> iterator = new ChannelReaderInputViewIterator(readerInputView, dataBuffer, this.serializer); + + record = iterator.next(record); + int i =1; + while (record != null) { + Assert.assertEquals(i, record.getKey()); + record = iterator.next(record); + i++; + } + + Assert.assertEquals(NUM_RECORDS, i); + + this.memoryManager.release(dataBuffer); + // release the memory occupied by the buffers + sorter.dispose(); + this.memoryManager.release(memory); + } }
