Repository: flink Updated Branches: refs/heads/master 7610588bc -> 7df6a3d72
[FLINK-1296] [runtime] Limit memory consumption on merged by spilling larger records into a single special merge stream Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/28a62d84 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/28a62d84 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/28a62d84 Branch: refs/heads/master Commit: 28a62d844e7a069a05383bb1e9e71f9daea41199 Parents: be8e1f1 Author: Stephan Ewen <[email protected]> Authored: Thu Dec 18 11:42:57 2014 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Wed Jan 21 12:01:35 2015 +0100 ---------------------------------------------------------------------- .../operators/hash/CompactingHashTable.java | 4 +- .../operators/sort/FixedLengthRecordSorter.java | 22 +- .../runtime/operators/sort/InMemorySorter.java | 14 +- .../operators/sort/NormalizedKeySorter.java | 98 ++-- .../runtime/operators/sort/SortBuffer.java | 510 ------------------- .../operators/sort/UnilateralSortMerger.java | 177 +------ .../apache/flink/runtime/util/IntArrayList.java | 28 +- .../operators/sort/ExternalSortITCase.java | 240 +-------- .../sort/ExternalSortLargeRecordsITCase.java | 455 +++++++++++++++++ .../sort/MassiveStringSortingITCase.java | 4 - 10 files changed, 570 insertions(+), 982 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/28a62d84/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java index 301aa82..7107972 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java @@ -1032,7 +1032,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{ int oldBucketCount = 0; int newBucketCount = 0; while(!hashList.isEmpty()) { - hash = hashList.removeInt(hashList.size()-1); + hash = hashList.removeLast(); pointer = pointerList.removeLong(pointerList.size()-1); posHashCode = hash % this.numBuckets; if(posHashCode == bucket && oldBucketCount < NUM_ENTRIES_PER_BUCKET) { @@ -1061,7 +1061,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{ int bucketInSegmentPos = 0; MemorySegment bucket = null; while(!overflowHashes.isEmpty()) { - hash = overflowHashes.removeInt(overflowHashes.size()-1); + hash = overflowHashes.removeLast(); pointer = overflowPointers.removeLong(overflowPointers.size()-1); posHashCode = hash % this.numBuckets; bucketArrayPos = posHashCode >>> this.bucketsPerSegmentBits; http://git-wip-us.apache.org/repos/asf/flink/blob/28a62d84/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java index a3766e7..cd982c4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java @@ -186,25 +186,20 @@ public final class FixedLengthRecordSorter<T> implements InMemorySorter<T> { return this.freeMemory; } - /** - * Gets the total capacity of this sorter, in bytes. - * - * @return The sorter's total capacity. - */ @Override public long getCapacity() { return ((long) this.totalNumBuffers) * this.segmentSize; } - /** - * Gets the number of bytes currently occupied in this sorter. - * - * @return The number of bytes occupied. - */ @Override public long getOccupancy() { return this.sortBufferBytes; } + + @Override + public long getNumRecordBytes() { + return this.sortBufferBytes; + } // ------------------------------------------------------------------------- // Retrieving and Writing @@ -430,6 +425,13 @@ public final class FixedLengthRecordSorter<T> implements InMemorySorter<T> { } } + @Override + public void writeToOutput(ChannelWriterOutputView output, LargeRecordHandler<T> largeRecordsOutput) + throws IOException + { + writeToOutput(output); + } + /** * Writes a subset of the records in this buffer in their logical order to the given output. * http://git-wip-us.apache.org/repos/asf/flink/blob/28a62d84/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java index 7dc4f97..633ec70 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java @@ -25,7 +25,6 @@ import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView; import org.apache.flink.util.MutableObjectIterator; - /** * */ @@ -58,13 +57,20 @@ public interface InMemorySorter<T> extends IndexedSortable { long getCapacity(); /** - * Gets the number of bytes currently occupied in this sorter. + * Gets the number of bytes currently occupied in this sorter, records and sort index. * * @return The number of bytes occupied. */ long getOccupancy(); /** + * Gets the number of bytes occupied by records only. + * + * @return The number of bytes occupied by records. + */ + long getNumRecordBytes(); + + /** * Gets the record at the given logical position. * * @param reuse The reuse object to deserialize the record into. @@ -96,7 +102,9 @@ public interface InMemorySorter<T> extends IndexedSortable { * @param output The output view to write the records to. * @throws IOException Thrown, if an I/O exception occurred writing to the output view. */ - public void writeToOutput(final ChannelWriterOutputView output) throws IOException; + public void writeToOutput(ChannelWriterOutputView output) throws IOException; + + public void writeToOutput(ChannelWriterOutputView output, LargeRecordHandler<T> largeRecordsOutput) throws IOException; /** * Writes a subset of the records in this buffer in their logical order to the given output. http://git-wip-us.apache.org/repos/asf/flink/blob/28a62d84/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java index c382708..97b9236 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java @@ -45,6 +45,8 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> { private static final int MAX_NORMALIZED_KEY_LEN_PER_ELEMENT = 8; private static final int MIN_REQUIRED_BUFFERS = 3; + + private static final int LARGE_RECORD_THRESHOLD = 10 * 1024 * 1024; // ------------------------------------------------------------------------ // Members @@ -227,25 +229,20 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> { return this.freeMemory; } - /** - * Gets the total capacity of this sorter, in bytes. - * - * @return The sorter's total capacity. - */ @Override public long getCapacity() { return ((long) this.totalNumBuffers) * this.segmentSize; } - /** - * Gets the number of bytes currently occupied in this sorter. - * - * @return The number of bytes occupied. - */ @Override public long getOccupancy() { return this.currentDataBufferOffset + this.sortIndexBytes; } + + @Override + public long getNumRecordBytes() { + return this.currentDataBufferOffset; + } // ------------------------------------------------------------------------- // Retrieving and Writing @@ -285,22 +282,27 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> { } } - // add the pointer and the normalized key - this.currentSortIndexSegment.putLong(this.currentSortIndexOffset, this.currentDataBufferOffset); - if(this.numKeyBytes != 0) { - this.comparator.putNormalizedKey(record, this.currentSortIndexSegment, this.currentSortIndexOffset + OFFSET_LEN, this.numKeyBytes); - } - // serialize the record into the data buffers try { this.serializer.serialize(record, this.recordCollector); - this.currentSortIndexOffset += this.indexEntrySize; - this.currentDataBufferOffset = this.recordCollector.getCurrentOffset(); - this.numRecords++; - return true; - } catch (EOFException eofex) { + } + catch (EOFException e) { return false; } + + final long newOffset = this.recordCollector.getCurrentOffset(); + final boolean shortRecord = newOffset - this.currentDataBufferOffset < LARGE_RECORD_THRESHOLD; + + // add the pointer and the normalized key + this.currentSortIndexSegment.putLong(this.currentSortIndexOffset, shortRecord ? this.currentDataBufferOffset : -this.currentDataBufferOffset); + if (this.numKeyBytes != 0) { + this.comparator.putNormalizedKey(record, this.currentSortIndexSegment, this.currentSortIndexOffset + OFFSET_LEN, this.numKeyBytes); + } + + this.currentSortIndexOffset += this.indexEntrySize; + this.currentDataBufferOffset = newOffset; + this.numRecords++; + return true; } // ------------------------------------------------------------------------ @@ -315,7 +317,7 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> { final int bufferNum = logicalPosition / this.indexEntriesPerSegment; final int segmentOffset = logicalPosition % this.indexEntriesPerSegment; - return this.sortIndex.get(bufferNum).getLong(segmentOffset * this.indexEntrySize); + return Math.abs(this.sortIndex.get(bufferNum).getLong(segmentOffset * this.indexEntrySize)); } private final T getRecordFromBuffer(T reuse, long pointer) throws IOException { @@ -368,8 +370,8 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> { return this.useNormKeyUninverted ? val : -val; } - final long pointerI = segI.getLong(segmentOffsetI); - final long pointerJ = segJ.getLong(segmentOffsetJ); + final long pointerI = Math.abs(segI.getLong(segmentOffsetI)); + final long pointerJ = Math.abs(segJ.getLong(segmentOffsetJ)); return compareRecords(pointerI, pointerJ); } @@ -422,7 +424,7 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> { this.currentIndexSegment = sortIndex.get(++this.currentSegment); } - long pointer = this.currentIndexSegment.getLong(this.currentOffset); + long pointer = Math.abs(this.currentIndexSegment.getLong(this.currentOffset)); this.currentOffset += indexEntrySize; try { @@ -475,30 +477,34 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> { * @throws IOException Thrown, if an I/O exception occurred writing to the output view. */ @Override - public void writeToOutput(final ChannelWriterOutputView output) throws IOException { - int recordsLeft = this.numRecords; + public void writeToOutput(ChannelWriterOutputView output) throws IOException { + writeToOutput(output, null); + } + + @Override + public void writeToOutput(ChannelWriterOutputView output, LargeRecordHandler<T> largeRecordsOutput) + throws IOException + { + final int numRecords = this.numRecords; int currentMemSeg = 0; - while (recordsLeft > 0) - { + int currentRecord = 0; + + while (currentRecord < numRecords) { final MemorySegment currentIndexSegment = this.sortIndex.get(currentMemSeg++); - int offset = 0; - // check whether we have a full or partially full segment - if (recordsLeft >= this.indexEntriesPerSegment) { - // full segment - for (;offset <= this.lastIndexEntryOffset; offset += this.indexEntrySize) { - final long pointer = currentIndexSegment.getLong(offset); + + // go through all records in the memory segment + for (int offset = 0; currentRecord < numRecords && offset <= this.lastIndexEntryOffset; currentRecord++, offset += this.indexEntrySize) { + final long pointer = currentIndexSegment.getLong(offset); + + // small records go into the regular spill file, large records into the special code path + if (pointer >= 0) { this.recordBuffer.setReadPosition(pointer); this.serializer.copy(this.recordBuffer, output); - } - recordsLeft -= this.indexEntriesPerSegment; - } else { - // partially filled segment - for (; recordsLeft > 0; recordsLeft--, offset += this.indexEntrySize) - { - final long pointer = currentIndexSegment.getLong(offset); - this.recordBuffer.setReadPosition(pointer); - this.serializer.copy(this.recordBuffer, output); + else { + this.recordBuffer.setReadPosition(-pointer); + T record = this.serializer.deserialize(this.recordBuffer); + largeRecordsOutput.addRecord(record); } } } @@ -524,7 +530,7 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> { if (num >= this.indexEntriesPerSegment && offset == 0) { // full segment for (;offset <= this.lastIndexEntryOffset; offset += this.indexEntrySize) { - final long pointer = currentIndexSegment.getLong(offset); + final long pointer = Math.abs(currentIndexSegment.getLong(offset)); this.recordBuffer.setReadPosition(pointer); this.serializer.copy(this.recordBuffer, output); } @@ -533,7 +539,7 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> { // partially filled segment for (; num > 0 && offset <= this.lastIndexEntryOffset; num--, offset += this.indexEntrySize) { - final long pointer = currentIndexSegment.getLong(offset); + final long pointer = Math.abs(currentIndexSegment.getLong(offset)); this.recordBuffer.setReadPosition(pointer); this.serializer.copy(this.recordBuffer, output); } http://git-wip-us.apache.org/repos/asf/flink/blob/28a62d84/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/SortBuffer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/SortBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/SortBuffer.java deleted file mode 100644 index be8511b..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/SortBuffer.java +++ /dev/null @@ -1,510 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.operators.sort; - -import java.io.EOFException; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.runtime.io.disk.RandomAccessInputView; -import org.apache.flink.runtime.io.disk.SimpleCollectingOutputView; -import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView; -import org.apache.flink.runtime.memorymanager.ListMemorySegmentSource; -import org.apache.flink.util.MutableObjectIterator; - -public class SortBuffer<T> implements InMemorySorter<T> { - - private static final int OFFSET_LEN = 8; - - private static final int DEFAULT_MAX_NORMALIZED_KEY_LEN = 16; - - private static final int MAX_NORMALIZED_KEY_LEN_PER_ELEMENT = 8; - - private static final int MIN_REQUIRED_BUFFERS = 3; - - // ------------------------------------------------------------------------ - // Members - // ------------------------------------------------------------------------ - - private final byte[] swapBuffer; - - private final TypeSerializer<T> serializer; - - private final TypeComparator<T> comparator; - - private final SimpleCollectingOutputView recordCollector; - - private final RandomAccessInputView recordBuffer; - - private final RandomAccessInputView recordBufferForComparison; - - private MemorySegment currentSortIndexSegment; - - private final ArrayList<MemorySegment> freeMemory; - - private final ArrayList<MemorySegment> sortIndex; - - private final ArrayList<MemorySegment> recordBufferSegments; - - private long currentDataBufferOffset; - - private long sortIndexBytes; - - private int currentSortIndexOffset; - - private int numRecords; - - private final int numKeyBytes; - - private final int indexEntrySize; - - private final int indexEntriesPerSegment; - - private final int lastIndexEntryOffset; - - private final int segmentSize; - - private final int totalNumBuffers; - - private final boolean normalizedKeyFullyDetermines; - - private final boolean useNormKeyUninverted; - - - // ------------------------------------------------------------------------- - // Constructors / Destructors - // ------------------------------------------------------------------------- - - public SortBuffer(TypeSerializer<T> serializer, TypeComparator<T> comparator, List<MemorySegment> memory) { - this(serializer, comparator, memory, DEFAULT_MAX_NORMALIZED_KEY_LEN); - } - - public SortBuffer(TypeSerializer<T> serializer, TypeComparator<T> comparator, - List<MemorySegment> memory, int maxNormalizedKeyBytes) - { - if (serializer == null || comparator == null || memory == null) { - throw new NullPointerException(); - } - if (maxNormalizedKeyBytes < 0) { - throw new IllegalArgumentException("Maximal number of normalized key bytes must not be negative."); - } - - this.serializer = serializer; - this.comparator = comparator; - this.useNormKeyUninverted = !comparator.invertNormalizedKey(); - - // check the size of the first buffer and record it. all further buffers must have the same size. - // the size must also be a power of 2 - this.totalNumBuffers = memory.size(); - if (this.totalNumBuffers < MIN_REQUIRED_BUFFERS) { - throw new IllegalArgumentException("Normalized-Key sorter requires at least " + MIN_REQUIRED_BUFFERS + " memory buffers."); - } - this.segmentSize = memory.get(0).size(); - - if (memory instanceof ArrayList<?>) { - this.freeMemory = (ArrayList<MemorySegment>) memory; - } - else { - this.freeMemory = new ArrayList<MemorySegment>(memory.size()); - this.freeMemory.addAll(memory); - } - - // create the buffer collections - this.sortIndex = new ArrayList<MemorySegment>(16); - this.recordBufferSegments = new ArrayList<MemorySegment>(16); - - // the views for the record collections - this.recordCollector = new SimpleCollectingOutputView(this.recordBufferSegments, - new ListMemorySegmentSource(this.freeMemory), this.segmentSize); - this.recordBuffer = new RandomAccessInputView(this.recordBufferSegments, this.segmentSize); - this.recordBufferForComparison = new RandomAccessInputView(this.recordBufferSegments, this.segmentSize); - - // set up normalized key characteristics - if (this.comparator.supportsNormalizedKey()) { - // compute the max normalized key length - int numPartialKeys; - try { - numPartialKeys = this.comparator.getFlatComparators().length; - } catch (Throwable t) { - numPartialKeys = 1; - } - - int maxLen = Math.min(maxNormalizedKeyBytes, MAX_NORMALIZED_KEY_LEN_PER_ELEMENT * numPartialKeys); - - this.numKeyBytes = Math.min(this.comparator.getNormalizeKeyLen(), maxLen); - this.normalizedKeyFullyDetermines = !this.comparator.isNormalizedKeyPrefixOnly(this.numKeyBytes); - } - else { - this.numKeyBytes = 0; - this.normalizedKeyFullyDetermines = false; - } - - // compute the index entry size and limits - this.indexEntrySize = this.numKeyBytes + OFFSET_LEN; - this.indexEntriesPerSegment = segmentSize / this.indexEntrySize; - this.lastIndexEntryOffset = (this.indexEntriesPerSegment - 1) * this.indexEntrySize; - this.swapBuffer = new byte[this.indexEntrySize]; - - // set to initial state - this.currentSortIndexSegment = nextMemorySegment(); - this.sortIndex.add(this.currentSortIndexSegment); - } - - // ------------------------------------------------------------------------- - // Memory Segment - // ------------------------------------------------------------------------- - - /** - * Resets the sort buffer back to the state where it is empty. All contained data is discarded. - */ - @Override - public void reset() { - // reset all offsets - this.numRecords = 0; - this.currentSortIndexOffset = 0; - this.currentDataBufferOffset = 0; - this.sortIndexBytes = 0; - - // return all memory - this.freeMemory.addAll(this.sortIndex); - this.freeMemory.addAll(this.recordBufferSegments); - this.sortIndex.clear(); - this.recordBufferSegments.clear(); - - // grab first buffers - this.currentSortIndexSegment = nextMemorySegment(); - this.sortIndex.add(this.currentSortIndexSegment); - this.recordCollector.reset(); - } - - /** - * Checks whether the buffer is empty. - * - * @return True, if no record is contained, false otherwise. - */ - @Override - public boolean isEmpty() { - return this.numRecords == 0; - } - - /** - * Collects all memory segments from this sorter. - * - * @return All memory segments from this sorter. - */ - @Override - public List<MemorySegment> dispose() { - this.freeMemory.addAll(this.sortIndex); - this.freeMemory.addAll(this.recordBufferSegments); - - this.recordBufferSegments.clear(); - this.sortIndex.clear(); - - return this.freeMemory; - } - - /** - * Gets the total capacity of this sorter, in bytes. - * - * @return The sorter's total capacity. - */ - @Override - public long getCapacity() { - return ((long) this.totalNumBuffers) * this.segmentSize; - } - - /** - * Gets the number of bytes currently occupied in this sorter. - * - * @return The number of bytes occupied. - */ - @Override - public long getOccupancy() { - return this.currentDataBufferOffset + this.sortIndexBytes; - } - - // ------------------------------------------------------------------------- - // Retrieving and Writing - // ------------------------------------------------------------------------- - - /** - * Gets the record at the given logical position. - * - * @param reuse The target object to deserialize the record into. - * @param logicalPosition The logical position of the record. - * @throws IOException Thrown, if an exception occurred during deserialization. - */ - @Override - public T getRecord(T reuse, int logicalPosition) throws IOException { - return getRecordFromBuffer(reuse, readPointer(logicalPosition)); - } - - /** - * Writes a given record to this sort buffer. The written record will be appended and take - * the last logical position. - * - * @param record The record to be written. - * @return True, if the record was successfully written, false, if the sort buffer was full. - * @throws IOException Thrown, if an error occurred while serializing the record into the buffers. - */ - @Override - public boolean write(T record) throws IOException { - //check whether we need a new memory segment for the sort index - if (this.currentSortIndexOffset > this.lastIndexEntryOffset) { - if (memoryAvailable()) { - this.currentSortIndexSegment = nextMemorySegment(); - this.sortIndex.add(this.currentSortIndexSegment); - this.currentSortIndexOffset = 0; - this.sortIndexBytes += this.segmentSize; - } else { - return false; - } - } - - // add the pointer and the normalized key - this.currentSortIndexSegment.putLong(this.currentSortIndexOffset, this.currentDataBufferOffset); - if(this.numKeyBytes != 0) { - this.comparator.putNormalizedKey(record, this.currentSortIndexSegment, this.currentSortIndexOffset + OFFSET_LEN, this.numKeyBytes); - } - - // serialize the record into the data buffers - try { - this.serializer.serialize(record, this.recordCollector); - this.currentSortIndexOffset += this.indexEntrySize; - this.currentDataBufferOffset = this.recordCollector.getCurrentOffset(); - this.numRecords++; - return true; - } catch (EOFException eofex) { - return false; - } - } - - // ------------------------------------------------------------------------ - // Access Utilities - // ------------------------------------------------------------------------ - - private final long readPointer(int logicalPosition) { - if (logicalPosition < 0 | logicalPosition >= this.numRecords) { - throw new IndexOutOfBoundsException(); - } - - final int bufferNum = logicalPosition / this.indexEntriesPerSegment; - final int segmentOffset = logicalPosition % this.indexEntriesPerSegment; - - return this.sortIndex.get(bufferNum).getLong(segmentOffset * this.indexEntrySize); - } - - private final T getRecordFromBuffer(T reuse, long pointer) throws IOException { - this.recordBuffer.setReadPosition(pointer); - return this.serializer.deserialize(reuse, this.recordBuffer); - } - - private final int compareRecords(long pointer1, long pointer2) { - this.recordBuffer.setReadPosition(pointer1); - this.recordBufferForComparison.setReadPosition(pointer2); - - try { - return this.comparator.compareSerialized(this.recordBuffer, this.recordBufferForComparison); - } catch (IOException ioex) { - throw new RuntimeException("Error comparing two records.", ioex); - } - } - - private final boolean memoryAvailable() { - return !this.freeMemory.isEmpty(); - } - - private final MemorySegment nextMemorySegment() { - return this.freeMemory.remove(this.freeMemory.size() - 1); - } - - // ------------------------------------------------------------------------- - // Indexed Sorting - // ------------------------------------------------------------------------- - - @Override - public int compare(int i, int j) { - final int bufferNumI = i / this.indexEntriesPerSegment; - final int segmentOffsetI = (i % this.indexEntriesPerSegment) * this.indexEntrySize; - - final int bufferNumJ = j / this.indexEntriesPerSegment; - final int segmentOffsetJ = (j % this.indexEntriesPerSegment) * this.indexEntrySize; - - final MemorySegment segI = this.sortIndex.get(bufferNumI); - final MemorySegment segJ = this.sortIndex.get(bufferNumJ); - - int val = MemorySegment.compare(segI, segJ, segmentOffsetI + OFFSET_LEN, segmentOffsetJ + OFFSET_LEN, this.numKeyBytes); - - if (val != 0 || this.normalizedKeyFullyDetermines) { - return this.useNormKeyUninverted ? val : -val; - } - - final long pointerI = segI.getLong(segmentOffsetI); - final long pointerJ = segJ.getLong(segmentOffsetJ); - - return compareRecords(pointerI, pointerJ); - } - - @Override - public void swap(int i, int j) { - final int bufferNumI = i / this.indexEntriesPerSegment; - final int segmentOffsetI = (i % this.indexEntriesPerSegment) * this.indexEntrySize; - - final int bufferNumJ = j / this.indexEntriesPerSegment; - final int segmentOffsetJ = (j % this.indexEntriesPerSegment) * this.indexEntrySize; - - final MemorySegment segI = this.sortIndex.get(bufferNumI); - final MemorySegment segJ = this.sortIndex.get(bufferNumJ); - - MemorySegment.swapBytes(segI, segJ, this.swapBuffer, segmentOffsetI, segmentOffsetJ, this.indexEntrySize); - } - - @Override - public int size() { - return this.numRecords; - } - - // ------------------------------------------------------------------------- - - /** - * Gets an iterator over all records in this buffer in their logical order. - * - * @return An iterator returning the records in their logical order. - */ - @Override - public final MutableObjectIterator<T> getIterator() { - return new MutableObjectIterator<T>() - { - private final int size = size(); - private int current = 0; - - private int currentSegment = 0; - private int currentOffset = 0; - - private MemorySegment currentIndexSegment = sortIndex.get(0); - - @Override - public T next(T target) - { - if (this.current < this.size) { - this.current++; - if (this.currentOffset > lastIndexEntryOffset) { - this.currentOffset = 0; - this.currentIndexSegment = sortIndex.get(++this.currentSegment); - } - - long pointer = this.currentIndexSegment.getLong(this.currentOffset); - this.currentOffset += indexEntrySize; - - try { - return getRecordFromBuffer(target, pointer); - } - catch (IOException ioe) { - throw new RuntimeException(ioe); - } - } - else { - return null; - } - } - }; - } - - // ------------------------------------------------------------------------ - // Writing to a DataOutputView - // ------------------------------------------------------------------------ - - /** - * Writes the records in this buffer in their logical order to the given output. - * - * @param output The output view to write the records to. - * @throws IOException Thrown, if an I/O exception occurred writing to the output view. - */ - @Override - public void writeToOutput(final ChannelWriterOutputView output) throws IOException { - int recordsLeft = this.numRecords; - int currentMemSeg = 0; - while (recordsLeft > 0) - { - final MemorySegment currentIndexSegment = this.sortIndex.get(currentMemSeg++); - int offset = 0; - // check whether we have a full or partially full segment - if (recordsLeft >= this.indexEntriesPerSegment) { - // full segment - for (;offset <= this.lastIndexEntryOffset; offset += this.indexEntrySize) { - final long pointer = currentIndexSegment.getLong(offset); - this.recordBuffer.setReadPosition(pointer); - this.serializer.copy(this.recordBuffer, output); - - } - recordsLeft -= this.indexEntriesPerSegment; - } else { - // partially filled segment - for (; recordsLeft > 0; recordsLeft--, offset += this.indexEntrySize) - { - final long pointer = currentIndexSegment.getLong(offset); - this.recordBuffer.setReadPosition(pointer); - this.serializer.copy(this.recordBuffer, output); - } - } - } - } - - /** - * Writes a subset of the records in this buffer in their logical order to the given output. - * - * @param output The output view to write the records to. - * @param start The logical start position of the subset. - * @param num The number of elements to write. - * @throws IOException Thrown, if an I/O exception occurred writing to the output view. - */ - @Override - public void writeToOutput(final ChannelWriterOutputView output, final int start, int num) throws IOException { - int currentMemSeg = start / this.indexEntriesPerSegment; - int offset = (start % this.indexEntriesPerSegment) * this.indexEntrySize; - - while (num > 0) - { - final MemorySegment currentIndexSegment = this.sortIndex.get(currentMemSeg++); - // check whether we have a full or partially full segment - if (num >= this.indexEntriesPerSegment && offset == 0) { - // full segment - for (;offset <= this.lastIndexEntryOffset; offset += this.indexEntrySize) { - final long pointer = currentIndexSegment.getLong(offset); - this.recordBuffer.setReadPosition(pointer); - this.serializer.copy(this.recordBuffer, output); - } - num -= this.indexEntriesPerSegment; - } else { - // partially filled segment - for (; num > 0 && offset <= this.lastIndexEntryOffset; num--, offset += this.indexEntrySize) - { - final long pointer = currentIndexSegment.getLong(offset); - this.recordBuffer.setReadPosition(pointer); - this.serializer.copy(this.recordBuffer, output); - } - } - offset = 0; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/28a62d84/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java index 51cc1cf..a534399 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java @@ -49,7 +49,6 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memorymanager.MemoryAllocationException; import org.apache.flink.runtime.memorymanager.MemoryManager; import org.apache.flink.runtime.util.EmptyMutableObjectIterator; -import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; /** @@ -78,7 +77,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> { /** The minimum number of segments that are required for the sort to operate. */ protected static final int MIN_NUM_SORT_MEM_SEGMENTS = 10; - + // ------------------------------------------------------------------------ // Threads // ------------------------------------------------------------------------ @@ -670,12 +669,13 @@ public class UnilateralSortMerger<E> implements Sorter<E> { * Class representing buffers that circulate between the reading, sorting and spilling thread. */ protected static final class CircularElement<E> { + final int id; final InMemorySorter<E> buffer; public CircularElement() { - this.buffer = null; this.id = -1; + this.buffer = null; } public CircularElement(int id, InMemorySorter<E> buffer) { @@ -770,8 +770,6 @@ public class UnilateralSortMerger<E> implements Sorter<E> { internalHandleException(new IOException("Thread '" + getName() + "' terminated due to an exception: " + t.getMessage(), t)); } - finally { - } } /** @@ -915,6 +913,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> { // write the last leftover pair, if we have one if (leftoverRecord != null) { if (!buffer.write(leftoverRecord)) { + // did not fit in a fresh buffer, must be large... if (this.largeRecords != null) { this.largeRecords.addRecord(leftoverRecord); } else { @@ -923,6 +922,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> { } buffer.reset(); } + leftoverRecord = null; } @@ -944,6 +944,9 @@ public class UnilateralSortMerger<E> implements Sorter<E> { fullBuffer = true; break; } + + // successfully added record + if (bytesUntilSpilling - buffer.getOccupancy() <= 0) { bytesUntilSpilling = 0; @@ -1303,7 +1306,9 @@ public class UnilateralSortMerger<E> implements Sorter<E> { if (LOG.isDebugEnabled()) { LOG.debug("Spilling buffer " + element.id + "."); } - element.buffer.writeToOutput(output); + + element.buffer.writeToOutput(output, largeRecordHandler); + if (LOG.isDebugEnabled()) { LOG.debug("Spilled buffer " + element.id + "."); } @@ -1655,166 +1660,8 @@ public class UnilateralSortMerger<E> implements Sorter<E> { } } - /** - * - */ - public static final class InputDataCollector<E> implements Collector<E> - { - private final CircularQueues<E> queues; // the queues used to pass buffers - - private InMemorySorter<E> currentBuffer; - - private CircularElement<E> currentElement; - - private long bytesUntilSpilling; // number of bytes left before we signal to spill - - private boolean spillingInThisBuffer; - - private volatile boolean running; - - - public InputDataCollector(CircularQueues<E> queues, long startSpillingBytes) - { - this.queues = queues; - this.bytesUntilSpilling = startSpillingBytes; - this.running = true; - - grabBuffer(); - } + protected static final class ChannelWithBlockCount { - private void grabBuffer() - { - while (this.currentElement == null) { - try { - this.currentElement = this.queues.empty.take(); - } - catch (InterruptedException iex) { - if (this.running) { - LOG.error("Reading thread was interrupted (without being shut down) while grabbing a buffer. " + - "Retrying to grab buffer..."); - } else { - return; - } - } - } - - this.currentBuffer = this.currentElement.buffer; - if (!this.currentBuffer.isEmpty()) { - throw new RuntimeException("New sort-buffer is not empty."); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Retrieved empty read buffer " + this.currentElement.id + "."); - } - - this.spillingInThisBuffer = this.currentBuffer.getCapacity() <= this.bytesUntilSpilling; - } - - - @Override - public void collect(E record) - { - try { - if (this.spillingInThisBuffer) { - if (this.currentBuffer.write(record)) { - if (this.bytesUntilSpilling - this.currentBuffer.getOccupancy() <= 0) { - this.bytesUntilSpilling = 0; - // send the sentinel - this.queues.sort.add(UnilateralSortMerger.<E>spillingMarker()); - } - return; - } - } - else { - // no spilling in this buffer - if (this.currentBuffer.write(record)) { - return; - } - } - - if (this.bytesUntilSpilling > 0) { - this.bytesUntilSpilling -= this.currentBuffer.getCapacity(); - if (this.bytesUntilSpilling <= 0) { - this.bytesUntilSpilling = 0; - // send the sentinel - this.queues.sort.add(UnilateralSortMerger.<E>spillingMarker()); - } - } - - // we came here when the buffer could not be written. send it to the sorter - // send the buffer - if (LOG.isDebugEnabled()) { - LOG.debug("Emitting full buffer from reader thread: " + this.currentElement.id + "."); - } - this.queues.sort.add(this.currentElement); - this.currentElement = null; - - // we need a new buffer. grab the next one - while (this.running && this.currentElement == null) { - try { - this.currentElement = this.queues.empty.take(); - } - catch (InterruptedException iex) { - if (this.running) { - LOG.error("Reading thread was interrupted (without being shut down) while grabbing a buffer. " + - "Retrying to grab buffer..."); - } else { - return; - } - } - } - if (!this.running) { - return; - } - - this.currentBuffer = this.currentElement.buffer; - if (!this.currentBuffer.isEmpty()) { - throw new RuntimeException("BUG: New sort-buffer is not empty."); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Retrieved empty read buffer " + this.currentElement.id + "."); - } - // write the record - if (!this.currentBuffer.write(record)) { - throw new RuntimeException("Record could not be written to empty sort-buffer: Serialized record exceeds buffer capacity."); - } - } - catch (IOException ioex) { - throw new RuntimeException("BUG: An error occurred while writing a record to the sort buffer: " + - ioex.getMessage(), ioex); - } - } - - - @Override - public void close() - { - if (this.running) { - this.running = false; - - if (this.currentBuffer != null && this.currentElement != null) { - if (this.currentBuffer.isEmpty()) { - this.queues.empty.add(this.currentElement); - } - else { - this.queues.sort.add(this.currentElement); - if (LOG.isDebugEnabled()) { - LOG.debug("Emitting last buffer from input collector: " + this.currentElement.id + "."); - } - } - } - - this.currentBuffer = null; - this.currentElement = null; - - this.queues.sort.add(UnilateralSortMerger.<E>endMarker()); - } - } - } - - protected static final class ChannelWithBlockCount - { private final FileIOChannel.ID channel; private final int blockCount; http://git-wip-us.apache.org/repos/asf/flink/blob/28a62d84/flink-runtime/src/main/java/org/apache/flink/runtime/util/IntArrayList.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/IntArrayList.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/IntArrayList.java index 6782b91..999c4b0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/IntArrayList.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/IntArrayList.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.util; +import java.util.NoSuchElementException; + /** * Minimal implementation of an array-backed list of ints */ @@ -42,16 +44,12 @@ public class IntArrayList { return true; } - public int removeInt(int index) { - if(index >= size) { - throw new IndexOutOfBoundsException("Index (" + index + ") is greater than or equal to list size (" + size + ")"); - } - final int old = array[ index ]; - size--; - if(index != size) { - System.arraycopy(array, index+1, array, index, size-index ); + public int removeLast() { + if (size == 0) { + throw new NoSuchElementException(); } - return old; + --size; + return array[size]; } public void clear() { @@ -59,7 +57,7 @@ public class IntArrayList { } public boolean isEmpty() { - return (size==0); + return size == 0; } private void grow(final int length) { @@ -71,4 +69,14 @@ public class IntArrayList { } } + public static final IntArrayList EMPTY = new IntArrayList(0) { + + public boolean add(int number) { + throw new UnsupportedOperationException(); + } + + public int removeLast() { + throw new UnsupportedOperationException(); + }; + }; } http://git-wip-us.apache.org/repos/asf/flink/blob/28a62d84/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java index 914359d..6e35ad0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java @@ -18,29 +18,12 @@ package org.apache.flink.runtime.operators.sort; -import static org.junit.Assert.*; - -import org.junit.Assert; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; import java.util.Comparator; -import java.util.Random; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerFactory; import org.apache.flink.api.common.typeutils.record.RecordComparator; import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.api.java.typeutils.ValueTypeInfo; -import org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; @@ -49,18 +32,21 @@ import org.apache.flink.runtime.memorymanager.MemoryManager; import org.apache.flink.runtime.operators.testutils.DummyInvokable; import org.apache.flink.runtime.operators.testutils.RandomIntPairGenerator; import org.apache.flink.runtime.operators.testutils.TestData; -import org.apache.flink.runtime.operators.testutils.TestData.Key; -import org.apache.flink.runtime.operators.testutils.TestData.Value; import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode; import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode; +import org.apache.flink.runtime.operators.testutils.TestData.Key; +import org.apache.flink.runtime.operators.testutils.TestData.Value; import org.apache.flink.runtime.operators.testutils.types.IntPair; import org.apache.flink.runtime.operators.testutils.types.IntPairComparator; import org.apache.flink.runtime.operators.testutils.types.IntPairSerializer; import org.apache.flink.types.Record; import org.apache.flink.util.MutableObjectIterator; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ExternalSortITCase { @@ -77,7 +63,7 @@ public class ExternalSortITCase { private static final int NUM_PAIRS = 200000; - private static final int MEMORY_SIZE = 1024 * 1024 * 78; + private static final int MEMORY_SIZE = 1024 * 1024 * 78; private final AbstractInvokable parentTask = new DummyInvokable(); @@ -256,7 +242,7 @@ public class ExternalSortITCase { merger.close(); } -// @Test + @Test public void testSpillingSortWithIntermediateMerge() throws Exception { // amount of pairs final int PAIRS = 10000000; @@ -310,7 +296,7 @@ public class ExternalSortITCase { merger.close(); } -// @Test + @Test public void testSpillingSortWithIntermediateMergeIntPair() throws Exception { // amount of pairs final int PAIRS = 50000000; @@ -361,214 +347,4 @@ public class ExternalSortITCase { Assert.assertEquals("Not all pairs were read back in.", PAIRS, pairsRead); merger.close(); } - - @Test - public void testSortWithLongRecordsOnly() { - try { - final int NUM_RECORDS = 10; - - final TypeInformation<?>[] types = new TypeInformation<?>[] { - BasicTypeInfo.LONG_TYPE_INFO, - new ValueTypeInfo<SomeMaybeLongValue>(SomeMaybeLongValue.class) - }; - - final TupleTypeInfo<Tuple2<Long, SomeMaybeLongValue>> typeInfo = - new TupleTypeInfo<Tuple2<Long,SomeMaybeLongValue>>(types); - final TypeSerializer<Tuple2<Long, SomeMaybeLongValue>> serializer = typeInfo.createSerializer(); - final TypeComparator<Tuple2<Long, SomeMaybeLongValue>> comparator = typeInfo.createComparator(new int[] {0}, new boolean[]{false}, 0); - - MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> source = - new MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>>() - { - private final Random rnd = new Random(); - private int num = 0; - - @Override - public Tuple2<Long, SomeMaybeLongValue> next(Tuple2<Long, SomeMaybeLongValue> reuse) { - if (num++ < NUM_RECORDS) { - long val = rnd.nextLong(); - return new Tuple2<Long, SomeMaybeLongValue>(val, new SomeMaybeLongValue((int) val)); - } - else { - return null; - } - - } - }; - - @SuppressWarnings("unchecked") - Sorter<Tuple2<Long, SomeMaybeLongValue>> sorter = new UnilateralSortMerger<Tuple2<Long, SomeMaybeLongValue>>( - this.memoryManager, this.ioManager, - source, this.parentTask, - new RuntimeStatefulSerializerFactory<Tuple2<Long, SomeMaybeLongValue>>(serializer, (Class<Tuple2<Long, SomeMaybeLongValue>>) (Class<?>) Tuple2.class), - comparator, 1.0, 1, 128, 0.7f); - - // check order - MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> iterator = sorter.getIterator(); - - Tuple2<Long, SomeMaybeLongValue> val = serializer.createInstance(); - - long prevKey = Long.MAX_VALUE; - - for (int i = 0; i < NUM_RECORDS; i++) { - val = iterator.next(val); - - assertTrue(val.f0 <= prevKey); - assertTrue(val.f0.intValue() == val.f1.val()); - } - - assertNull(iterator.next(val)); - - sorter.close(); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testSortWithLongAndShortRecordsMixed() { - try { - final int NUM_RECORDS = 1000000; - final int LARGE_REC_INTERVAL = 100000; - - final TypeInformation<?>[] types = new TypeInformation<?>[] { - BasicTypeInfo.LONG_TYPE_INFO, - new ValueTypeInfo<SomeMaybeLongValue>(SomeMaybeLongValue.class) - }; - - final TupleTypeInfo<Tuple2<Long, SomeMaybeLongValue>> typeInfo = - new TupleTypeInfo<Tuple2<Long,SomeMaybeLongValue>>(types); - final TypeSerializer<Tuple2<Long, SomeMaybeLongValue>> serializer = typeInfo.createSerializer(); - final TypeComparator<Tuple2<Long, SomeMaybeLongValue>> comparator = typeInfo.createComparator(new int[] {0}, new boolean[]{false}, 0); - - MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> source = - new MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>>() - { - private final Random rnd = new Random(); - private int num = -1; - - @Override - public Tuple2<Long, SomeMaybeLongValue> next(Tuple2<Long, SomeMaybeLongValue> reuse) { - if (++num < NUM_RECORDS) { - long val = rnd.nextLong(); - return new Tuple2<Long, SomeMaybeLongValue>(val, new SomeMaybeLongValue((int) val, num % LARGE_REC_INTERVAL == 0)); - } - else { - return null; - } - - } - }; - - @SuppressWarnings("unchecked") - Sorter<Tuple2<Long, SomeMaybeLongValue>> sorter = new UnilateralSortMerger<Tuple2<Long, SomeMaybeLongValue>>( - this.memoryManager, this.ioManager, - source, this.parentTask, - new RuntimeStatefulSerializerFactory<Tuple2<Long, SomeMaybeLongValue>>(serializer, (Class<Tuple2<Long, SomeMaybeLongValue>>) (Class<?>) Tuple2.class), - comparator, 1.0, 1, 128, 0.7f); - - // check order - MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> iterator = sorter.getIterator(); - - Tuple2<Long, SomeMaybeLongValue> val = serializer.createInstance(); - - long prevKey = Long.MAX_VALUE; - - for (int i = 0; i < NUM_RECORDS; i++) { - val = iterator.next(val); - - assertTrue(val.f0 <= prevKey); - assertTrue(val.f0.intValue() == val.f1.val()); - } - - assertNull(iterator.next(val)); - - sorter.close(); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - // -------------------------------------------------------------------------------------------- - - public static final class SomeMaybeLongValue implements org.apache.flink.types.Value { - - private static final long serialVersionUID = 1L; - - private static final byte[] BUFFER = new byte[100000000]; - - static { - for (int i = 0; i < BUFFER.length; i++) { - BUFFER[i] = (byte) i; - } - } - - private int val; - - private boolean isLong; - - - public SomeMaybeLongValue() { - this.isLong = true; - } - - public SomeMaybeLongValue(int val) { - this.val = val; - this.isLong = true; - } - - public SomeMaybeLongValue(int val, boolean isLong) { - this.val = val; - this.isLong = isLong; - } - - public int val() { - return val; - } - - public boolean isLong() { - return isLong; - } - - @Override - public void read(DataInputView in) throws IOException { - val = in.readInt(); - isLong = in.readBoolean(); - - if (isLong) { - for (int i = 0; i < BUFFER.length; i++) { - byte b = in.readByte(); - assertEquals(BUFFER[i], b); - } - } - } - - @Override - public void write(DataOutputView out) throws IOException { - out.writeInt(val); - out.writeBoolean(isLong); - if (isLong) { - out.write(BUFFER); - } - } - - @Override - public int hashCode() { - return val; - } - - @Override - public boolean equals(Object obj) { - return (obj instanceof SomeMaybeLongValue) && ((SomeMaybeLongValue) obj).val == this.val; - } - - @Override - public String toString() { - return isLong ? "Large Value" : "Small Value"; - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/28a62d84/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java new file mode 100644 index 0000000..33d15ae --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java @@ -0,0 +1,455 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.Random; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.ValueTypeInfo; +import org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memorymanager.DefaultMemoryManager; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.runtime.operators.testutils.DummyInvokable; +import org.apache.flink.util.MutableObjectIterator; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + + +public class ExternalSortLargeRecordsITCase { + + private static final int MEMORY_SIZE = 1024 * 1024 * 78; + + private final AbstractInvokable parentTask = new DummyInvokable(); + + private IOManager ioManager; + + private MemoryManager memoryManager; + + // -------------------------------------------------------------------------------------------- + + @Before + public void beforeTest() { + this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1); + this.ioManager = new IOManagerAsync(); + } + + @After + public void afterTest() { + this.ioManager.shutdown(); + if (!this.ioManager.isProperlyShutDown()) { + Assert.fail("I/O Manager was not properly shut down."); + } + + if (this.memoryManager != null) { + Assert.assertTrue("Memory leak: not all segments have been returned to the memory manager.", + this.memoryManager.verifyEmpty()); + this.memoryManager.shutdown(); + this.memoryManager = null; + } + } + + // -------------------------------------------------------------------------------------------- + + @Test + public void testSortWithLongRecordsOnly() { + try { + final int NUM_RECORDS = 10; + + final TypeInformation<?>[] types = new TypeInformation<?>[] { + BasicTypeInfo.LONG_TYPE_INFO, + new ValueTypeInfo<SomeMaybeLongValue>(SomeMaybeLongValue.class) + }; + + final TupleTypeInfo<Tuple2<Long, SomeMaybeLongValue>> typeInfo = + new TupleTypeInfo<Tuple2<Long,SomeMaybeLongValue>>(types); + final TypeSerializer<Tuple2<Long, SomeMaybeLongValue>> serializer = typeInfo.createSerializer(); + final TypeComparator<Tuple2<Long, SomeMaybeLongValue>> comparator = typeInfo.createComparator(new int[] {0}, new boolean[]{false}, 0); + + MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> source = + new MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>>() + { + private final Random rnd = new Random(); + private int num = 0; + + @Override + public Tuple2<Long, SomeMaybeLongValue> next(Tuple2<Long, SomeMaybeLongValue> reuse) { + if (num++ < NUM_RECORDS) { + long val = rnd.nextLong(); + return new Tuple2<Long, SomeMaybeLongValue>(val, new SomeMaybeLongValue((int) val)); + } + else { + return null; + } + + } + }; + + @SuppressWarnings("unchecked") + Sorter<Tuple2<Long, SomeMaybeLongValue>> sorter = new UnilateralSortMerger<Tuple2<Long, SomeMaybeLongValue>>( + this.memoryManager, this.ioManager, + source, this.parentTask, + new RuntimeStatefulSerializerFactory<Tuple2<Long, SomeMaybeLongValue>>(serializer, (Class<Tuple2<Long, SomeMaybeLongValue>>) (Class<?>) Tuple2.class), + comparator, 1.0, 1, 128, 0.7f); + + // check order + MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> iterator = sorter.getIterator(); + + Tuple2<Long, SomeMaybeLongValue> val = serializer.createInstance(); + + long prevKey = Long.MAX_VALUE; + + for (int i = 0; i < NUM_RECORDS; i++) { + val = iterator.next(val); + + assertTrue(val.f0 <= prevKey); + assertTrue(val.f0.intValue() == val.f1.val()); + } + + assertNull(iterator.next(val)); + + sorter.close(); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testSortWithLongAndShortRecordsMixed() { + try { + final int NUM_RECORDS = 1000000; + final int LARGE_REC_INTERVAL = 100000; + + final TypeInformation<?>[] types = new TypeInformation<?>[] { + BasicTypeInfo.LONG_TYPE_INFO, + new ValueTypeInfo<SomeMaybeLongValue>(SomeMaybeLongValue.class) + }; + + final TupleTypeInfo<Tuple2<Long, SomeMaybeLongValue>> typeInfo = + new TupleTypeInfo<Tuple2<Long,SomeMaybeLongValue>>(types); + final TypeSerializer<Tuple2<Long, SomeMaybeLongValue>> serializer = typeInfo.createSerializer(); + final TypeComparator<Tuple2<Long, SomeMaybeLongValue>> comparator = typeInfo.createComparator(new int[] {0}, new boolean[]{false}, 0); + + MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> source = + new MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>>() + { + private final Random rnd = new Random(); + private int num = -1; + + @Override + public Tuple2<Long, SomeMaybeLongValue> next(Tuple2<Long, SomeMaybeLongValue> reuse) { + if (++num < NUM_RECORDS) { + long val = rnd.nextLong(); + return new Tuple2<Long, SomeMaybeLongValue>(val, new SomeMaybeLongValue((int) val, num % LARGE_REC_INTERVAL == 0)); + } + else { + return null; + } + + } + }; + + @SuppressWarnings("unchecked") + Sorter<Tuple2<Long, SomeMaybeLongValue>> sorter = new UnilateralSortMerger<Tuple2<Long, SomeMaybeLongValue>>( + this.memoryManager, this.ioManager, + source, this.parentTask, + new RuntimeStatefulSerializerFactory<Tuple2<Long, SomeMaybeLongValue>>(serializer, (Class<Tuple2<Long, SomeMaybeLongValue>>) (Class<?>) Tuple2.class), + comparator, 1.0, 1, 128, 0.7f); + + // check order + MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> iterator = sorter.getIterator(); + + Tuple2<Long, SomeMaybeLongValue> val = serializer.createInstance(); + + long prevKey = Long.MAX_VALUE; + + for (int i = 0; i < NUM_RECORDS; i++) { + val = iterator.next(val); + + assertTrue(val.f0 <= prevKey); + assertTrue(val.f0.intValue() == val.f1.val()); + } + + assertNull(iterator.next(val)); + + sorter.close(); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testSortWithShortMediumAndLargeRecords() { + try { + final int NUM_RECORDS = 50000; + final int LARGE_REC_INTERVAL = 10000; + final int MEDIUM_REC_INTERVAL = 500; + + final TypeInformation<?>[] types = new TypeInformation<?>[] { + BasicTypeInfo.LONG_TYPE_INFO, + new ValueTypeInfo<SmallOrMediumOrLargeValue>(SmallOrMediumOrLargeValue.class) + }; + + final TupleTypeInfo<Tuple2<Long, SmallOrMediumOrLargeValue>> typeInfo = + new TupleTypeInfo<Tuple2<Long,SmallOrMediumOrLargeValue>>(types); + + final TypeSerializer<Tuple2<Long, SmallOrMediumOrLargeValue>> serializer = typeInfo.createSerializer(); + final TypeComparator<Tuple2<Long, SmallOrMediumOrLargeValue>> comparator = typeInfo.createComparator(new int[] {0}, new boolean[]{false}, 0); + + MutableObjectIterator<Tuple2<Long, SmallOrMediumOrLargeValue>> source = + new MutableObjectIterator<Tuple2<Long, SmallOrMediumOrLargeValue>>() + { + private final Random rnd = new Random(); + private int num = -1; + + @Override + public Tuple2<Long, SmallOrMediumOrLargeValue> next(Tuple2<Long, SmallOrMediumOrLargeValue> reuse) { + if (++num < NUM_RECORDS) { + + int size; + if (num % LARGE_REC_INTERVAL == 0) { + size = SmallOrMediumOrLargeValue.LARGE_SIZE; + } else if (num % MEDIUM_REC_INTERVAL == 0) { + size = SmallOrMediumOrLargeValue.MEDIUM_SIZE; + } else { + size = SmallOrMediumOrLargeValue.SMALL_SIZE; + } + + long val = rnd.nextLong(); + return new Tuple2<Long, SmallOrMediumOrLargeValue>(val, new SmallOrMediumOrLargeValue((int) val, size)); + } + else { + return null; + } + + } + }; + + @SuppressWarnings("unchecked") + Sorter<Tuple2<Long, SmallOrMediumOrLargeValue>> sorter = new UnilateralSortMerger<Tuple2<Long, SmallOrMediumOrLargeValue>>( + this.memoryManager, this.ioManager, + source, this.parentTask, + new RuntimeStatefulSerializerFactory<Tuple2<Long, SmallOrMediumOrLargeValue>>(serializer, (Class<Tuple2<Long, SmallOrMediumOrLargeValue>>) (Class<?>) Tuple2.class), + comparator, 1.0, 1, 128, 0.7f); + + // check order + MutableObjectIterator<Tuple2<Long, SmallOrMediumOrLargeValue>> iterator = sorter.getIterator(); + + Tuple2<Long, SmallOrMediumOrLargeValue> val = serializer.createInstance(); + + long prevKey = Long.MAX_VALUE; + + for (int i = 0; i < NUM_RECORDS; i++) { + val = iterator.next(val); + + assertTrue(val.f0 <= prevKey); + assertTrue(val.f0.intValue() == val.f1.val()); + } + + assertNull(iterator.next(val)); + + sorter.close(); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + // -------------------------------------------------------------------------------------------- + + public static final class SomeMaybeLongValue implements org.apache.flink.types.Value { + + private static final long serialVersionUID = 1L; + + private static final byte[] BUFFER = new byte[100 * 1024 * 1024]; + + static { + for (int i = 0; i < BUFFER.length; i++) { + BUFFER[i] = (byte) i; + } + } + + private int val; + + private boolean isLong; + + + public SomeMaybeLongValue() { + this.isLong = true; + } + + public SomeMaybeLongValue(int val) { + this.val = val; + this.isLong = true; + } + + public SomeMaybeLongValue(int val, boolean isLong) { + this.val = val; + this.isLong = isLong; + } + + public int val() { + return val; + } + + public boolean isLong() { + return isLong; + } + + @Override + public void read(DataInputView in) throws IOException { + val = in.readInt(); + isLong = in.readBoolean(); + + if (isLong) { + for (int i = 0; i < BUFFER.length; i++) { + byte b = in.readByte(); + assertEquals(BUFFER[i], b); + } + } + } + + @Override + public void write(DataOutputView out) throws IOException { + out.writeInt(val); + out.writeBoolean(isLong); + if (isLong) { + out.write(BUFFER); + } + } + + @Override + public int hashCode() { + return val; + } + + @Override + public boolean equals(Object obj) { + return (obj instanceof SomeMaybeLongValue) && ((SomeMaybeLongValue) obj).val == this.val; + } + + @Override + public String toString() { + return isLong ? "Large Value" : "Small Value"; + } + } + + public static final class SmallOrMediumOrLargeValue implements org.apache.flink.types.Value { + + private static final long serialVersionUID = 1L; + + public static final int TYPE_SMALL = 0; + public static final int TYPE_MEDIUM = 1; + public static final int TYPE_LARGE = 2; + + public static final int SMALL_SIZE = 0; + public static final int MEDIUM_SIZE = 12 * 1024 * 1024; + public static final int LARGE_SIZE = 100 * 1024 * 1024; + + private int val; + + private int size; + + + public SmallOrMediumOrLargeValue() { + this.size = SMALL_SIZE; + } + + public SmallOrMediumOrLargeValue(int val) { + this.val = val; + this.size = SMALL_SIZE; + } + + public SmallOrMediumOrLargeValue(int val, int size) { + this.val = val; + this.size = size; + } + + public int val() { + return val; + } + + public int getSize() { + return size; + } + + @Override + public void read(DataInputView in) throws IOException { + val = in.readInt(); + size = in.readInt(); + + for (int i = 0; i < size; i++) { + byte b = in.readByte(); + assertEquals((byte) i, b); + } + } + + @Override + public void write(DataOutputView out) throws IOException { + out.writeInt(val); + out.writeInt(size); + + for (int i = 0; i < size; i++) { + out.write((byte) (i)); + } + } + + @Override + public int hashCode() { + return val; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof SmallOrMediumOrLargeValue) { + SmallOrMediumOrLargeValue other = (SmallOrMediumOrLargeValue) obj; + return other.val == this.val && other.size == this.size; + } else { + return false; + } + } + + @Override + public String toString() { + return String.format("Value %d (%d bytes)", val, size); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/28a62d84/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java index 55d01d2..a711d80 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.runtime.operators.sort; import java.io.BufferedReader; @@ -48,9 +47,6 @@ public class MassiveStringSortingITCase { private static final long SEED = 347569784659278346L; - @SuppressWarnings("unused") - private static final char LINE_BREAK = '\n'; - public void testStringSorting() { File input = null;
