[FLINK-1296] [runtime] Add sorter support for very large records
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/48256560 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/48256560 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/48256560 Branch: refs/heads/master Commit: 482565608414fea8df7ed86d5d9a8d90fdfcd4f7 Parents: 996d404 Author: Stephan Ewen <[email protected]> Authored: Tue Dec 2 22:20:26 2014 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Wed Jan 21 12:01:35 2015 +0100 ---------------------------------------------------------------------- .../org/apache/flink/api/java/tuple/Tuple.java | 28 + .../flink/api/java/tuple/TupleGenerator.java | 12 +- .../flink/api/java/typeutils/TupleTypeInfo.java | 13 +- .../RuntimeStatefulSerializerFactory.java | 3 + .../typeutils/runtime/TupleSerializerBase.java | 3 + flink-runtime/pom.xml | 14 +- .../runtime/io/disk/FileChannelOutputView.java | 4 + .../io/disk/SeekableFileChannelInputView.java | 2 +- .../runtime/io/disk/iomanager/IOManager.java | 32 +- .../sort/CombiningUnilateralSortMerger.java | 14 +- .../operators/sort/LargeRecordHandler.java | 451 ++++++++++++++++ .../runtime/operators/sort/SortBuffer.java | 510 +++++++++++++++++++ .../operators/sort/UnilateralSortMerger.java | 433 +++++++++------- .../operators/sort/ExternalSortITCase.java | 227 ++++++++- .../sort/LargeRecordHandlerITCase.java | 277 ++++++++++ .../operators/sort/LargeRecordHandlerTest.java | 271 ++++++++++ 16 files changed, 2062 insertions(+), 232 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/48256560/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java index 5e2b8a2..145d215 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/Tuple.java @@ -81,4 +81,32 @@ public abstract class Tuple implements java.io.Serializable { * @return The number of fields in the tuple. */ public abstract int getArity(); + + // -------------------------------------------------------------------------------------------- + + /** + * Gets the class corresponding to the tuple of the given arity (dimensions). For + * example, {@code getTupleClass(3)} will return the {@code Tuple3.class}. + * + * @param arity The arity of the tuple class to get. + * @return The tuple class with the given arity. + */ + @SuppressWarnings("unchecked") + public static Class<? extends Tuple> getTupleClass(int arity) { + if (arity < 1 || arity > MAX_ARITY) { + throw new IllegalArgumentException("The tuple arity must be in [0, " + MAX_ARITY + "]."); + } + return (Class<? extends Tuple>) CLASSES[arity - 1]; + } + + // -------------------------------------------------------------------------------------------- + // The following lines are generated. + // -------------------------------------------------------------------------------------------- + + // BEGIN_OF_TUPLE_DEPENDENT_CODE + // GENERATED FROM org.apache.flink.api.java.tuple.TupleGenerator. + private static final Class<?>[] CLASSES = new Class<?>[] { + Tuple1.class, Tuple2.class, Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class, Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class, Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class, Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class, Tuple24.class, Tuple25.class + }; + // END_OF_TUPLE_DEPENDENT_CODE } http://git-wip-us.apache.org/repos/asf/flink/blob/48256560/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java index 3bfa94b..ed429e3 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java @@ -53,9 +53,9 @@ class TupleGenerator { private static final String CSV_READER_CLASSNAME = "CsvReader"; // Parameters for TupleTypeInfo - private static final String TUPLE_TYPE_INFO_PACKAGE = "org.apache.flink.api.java.typeutils"; + private static final String TUPLE_PACKAGE = "org.apache.flink.api.java.tuple"; - private static final String TUPLE_TYPE_INFO_CLASSNAME = "TupleTypeInfo"; + private static final String TUPLE_CLASSNAME = "Tuple"; // Parameters for ProjectOperator private static final String PROJECT_OPERATOR_PACKAGE = "org.apache.flink.api.java.operators"; @@ -92,7 +92,7 @@ class TupleGenerator { modifyCsvReader(root); - modifyTupleTypeInfo(root); + modifyTupleType(root); modifyProjectOperator(root); @@ -395,7 +395,7 @@ class TupleGenerator { insertCodeIntoFile(sb.toString(), projectOperatorClass); } - private static void modifyTupleTypeInfo(File root) throws IOException { + private static void modifyTupleType(File root) throws IOException { // generate code StringBuilder sb = new StringBuilder(); sb.append("\tprivate static final Class<?>[] CLASSES = new Class<?>[] {\n\t\t"); @@ -408,8 +408,8 @@ class TupleGenerator { sb.append("\n\t};"); // insert code into file - File dir = getPackage(root, TUPLE_TYPE_INFO_PACKAGE); - File tupleTypeInfoClass = new File(dir, TUPLE_TYPE_INFO_CLASSNAME + ".java"); + File dir = getPackage(root, TUPLE_PACKAGE); + File tupleTypeInfoClass = new File(dir, TUPLE_CLASSNAME + ".java"); insertCodeIntoFile(sb.toString(), tupleTypeInfoClass); } http://git-wip-us.apache.org/repos/asf/flink/blob/48256560/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java index 177f033..af258e2 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java @@ -37,7 +37,7 @@ public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> { @SuppressWarnings("unchecked") public TupleTypeInfo(TypeInformation<?>... types) { - this((Class<T>) CLASSES[types.length - 1], types); + this((Class<T>) Tuple.getTupleClass(types.length), types); } public TupleTypeInfo(Class<T> tupleType, TypeInformation<?>... types) { @@ -152,15 +152,4 @@ public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> { TupleTypeInfo<X> tupleInfo = (TupleTypeInfo<X>) new TupleTypeInfo<Tuple>(infos); return tupleInfo; } - - // -------------------------------------------------------------------------------------------- - // The following lines are generated. - // -------------------------------------------------------------------------------------------- - - // BEGIN_OF_TUPLE_DEPENDENT_CODE - // GENERATED FROM org.apache.flink.api.java.tuple.TupleGenerator. - private static final Class<?>[] CLASSES = new Class<?>[] { - Tuple1.class, Tuple2.class, Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class, Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class, Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class, Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class, Tuple24.class, Tuple25.class - }; - // END_OF_TUPLE_DEPENDENT_CODE } http://git-wip-us.apache.org/repos/asf/flink/blob/48256560/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeStatefulSerializerFactory.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeStatefulSerializerFactory.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeStatefulSerializerFactory.java index 19cd3b7..dcc31bf 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeStatefulSerializerFactory.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeStatefulSerializerFactory.java @@ -55,6 +55,9 @@ public final class RuntimeStatefulSerializerFactory<T> implements TypeSerializer } } + public void setClassLoader(ClassLoader loader) { + this.loader = loader; + } @Override public void writeParametersToConfig(Configuration config) { http://git-wip-us.apache.org/repos/asf/flink/blob/48256560/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java index 08df7d3..a30eda3 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java @@ -55,6 +55,9 @@ public abstract class TupleSerializerBase<T> extends TypeSerializer<T> { this.stateful = stateful; } + public Class<T> getTupleClass() { + return this.tupleClass; + } @Override public boolean isImmutableType() { http://git-wip-us.apache.org/repos/asf/flink/blob/48256560/flink-runtime/pom.xml ---------------------------------------------------------------------- diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml index 692e6e1..5ac12d5 100644 --- a/flink-runtime/pom.xml +++ b/flink-runtime/pom.xml @@ -40,6 +40,12 @@ under the License. <artifactId>flink-core</artifactId> <version>${project.version}</version> </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>commons-cli</groupId> @@ -70,7 +76,6 @@ under the License. <version>1.8.1</version> </dependency> - <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> @@ -92,13 +97,6 @@ under the License. </dependency> <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-java</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/flink/blob/48256560/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelOutputView.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelOutputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelOutputView.java index 2b8b728..e04759c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelOutputView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelOutputView.java @@ -128,6 +128,10 @@ public class FileChannelOutputView extends AbstractPagedOutputView { return bytesInLatestSegment; } + public long getWriteOffset() { + return ((long) numBlocksWritten) * segmentSize + getCurrentPositionInSegment(); + } + @Override protected MemorySegment nextSegment(MemorySegment current, int posInSegment) throws IOException { if (current != null) { http://git-wip-us.apache.org/repos/asf/flink/blob/48256560/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java index e97a1ff..6098fdb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java @@ -116,7 +116,7 @@ public class SeekableFileChannelInputView extends AbstractPagedInputView { reader = ioManager.createBlockChannelReader(channelId); if (block > 0) { - reader.seekToPosition(block * segmentSize); + reader.seekToPosition(((long) block) * segmentSize); } this.numBlocksRemaining = this.numBlocksTotal - block; http://git-wip-us.apache.org/repos/asf/flink/blob/48256560/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java index ff2fb85..e58c4d8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java @@ -97,6 +97,19 @@ public abstract class IOManager { return new FileIOChannel.Enumerator(this.paths, this.random); } + /** + * Deletes the file underlying the given channel. If the channel is still open, this + * call may fail. + * + * @param channel The channel to be deleted. + * @throws IOException Thrown if the deletion fails. + */ + public void deleteChannel(FileIOChannel.ID channel) throws IOException { + if (channel != null) { + new File(channel.getPath()).delete(); + } + } + // ------------------------------------------------------------------------ // Reader / Writer instantiations // ------------------------------------------------------------------------ @@ -179,11 +192,20 @@ public abstract class IOManager { */ public abstract BulkBlockChannelReader createBulkBlockChannelReader(FileIOChannel.ID channelID, List<MemorySegment> targetSegments, int numBlocks) throws IOException; - - // ======================================================================== - // Utilities - // ======================================================================== - + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + /** + * Gets the number of directories across which the I/O manager rotates its files. + * + * @return The number of temporary file directories. + */ + public int getNumberOfTempDirs() { + return this.paths.length; + } + protected int getNextPathNum() { final int next = this.nextPath; final int newNext = next + 1; http://git-wip-us.apache.org/repos/asf/flink/blob/48256560/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java index 5d4c881..35297ca 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java @@ -43,7 +43,7 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memorymanager.MemoryAllocationException; import org.apache.flink.runtime.memorymanager.MemoryManager; import org.apache.flink.runtime.util.EmptyMutableObjectIterator; -import org.apache.flink.runtime.util.ReusingKeyGroupedIterator; +import org.apache.flink.runtime.util.KeyGroupedIterator; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; import org.apache.flink.util.TraversableOnceException; @@ -141,7 +141,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> { throws IOException, MemoryAllocationException { super(memoryManager, ioManager, input, parentTask, serializerFactory, comparator, - memoryFraction, numSortBuffers, maxNumFileHandles, startSpillingFraction, false); + memoryFraction, numSortBuffers, maxNumFileHandles, startSpillingFraction, false, true); this.combineStub = combineStub; } @@ -243,7 +243,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> { } disposeSortBuffers(true); - // set result iterator + // set lazy iterator MutableObjectIterator<E> resIter = iterators.isEmpty() ? EmptyMutableObjectIterator.<E>get() : iterators.size() == 1 ? iterators.get(0) : new MergeIterator<E>(iterators, this.comparator); @@ -392,8 +392,8 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> { // ------------------- Merging Phase ------------------------ // merge channels until sufficient file handles are available - while (isRunning() && channelIDs.size() > this.maxNumFileHandles) { - channelIDs = mergeChannelList(channelIDs, this.sortReadMemory, this.writeMemory); + while (isRunning() && channelIDs.size() > this.maxFanIn) { + channelIDs = mergeChannelList(channelIDs, this.mergeReadMemory, this.writeMemory); } // from here on, we won't write again @@ -413,7 +413,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> { List<List<MemorySegment>> readBuffers = new ArrayList<List<MemorySegment>>(channelIDs.size()); // allocate the read memory and register it to be released - getSegmentsForReaders(readBuffers, this.sortReadMemory, channelIDs.size()); + getSegmentsForReaders(readBuffers, this.mergeReadMemory, channelIDs.size()); // get the readers and register them to be released final MergeIterator<E> mergeIterator = getMergingIterator( @@ -452,7 +452,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> { final List<FileIOChannel> channelAccesses = new ArrayList<FileIOChannel>(channelIDs.size()); // the list with the target iterators - final MergeIterator<E> mergeIterator = getMergingIterator(channelIDs, readBuffers, channelAccesses); + final MergeIterator<E> mergeIterator = getMergingIterator(channelIDs, readBuffers, channelAccesses, null); final ReusingKeyGroupedIterator<E> groupedIter = new ReusingKeyGroupedIterator<E>(mergeIterator, this.serializer, this.comparator2); // create a new channel writer http://git-wip-us.apache.org/repos/asf/flink/blob/48256560/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java new file mode 100644 index 0000000..83e003f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java @@ -0,0 +1,451 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkArgument; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerFactory; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory; +import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory; +import org.apache.flink.api.java.typeutils.runtime.TupleComparator; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.FileChannelInputView; +import org.apache.flink.runtime.io.disk.FileChannelOutputView; +import org.apache.flink.runtime.io.disk.InputViewIterator; +import org.apache.flink.runtime.io.disk.SeekableFileChannelInputView; +import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.types.NullKeyFieldException; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class LargeRecordHandler<T> { + + private static final Logger LOG = LoggerFactory.getLogger(LargeRecordHandler.class); + + private static final int MIN_SEGMENTS_FOR_KEY_SPILLING = 1; + + private static final int MAX_SEGMENTS_FOR_KEY_SPILLING = 4; + + // -------------------------------------------------------------------------------------------- + + private final TypeSerializer<T> serializer; + + private final TypeComparator<T> comparator; + + private TupleSerializer<Tuple> keySerializer; + + private TupleComparator<Tuple> keyComparator; + + private FileChannelOutputView recordsOutFile; + + private FileChannelOutputView keysOutFile; + + private Tuple keyTuple; + + private FileChannelInputView keysReader; + + private SeekableFileChannelInputView recordsReader; + + private FileIOChannel.ID recordsChannel; + + private FileIOChannel.ID keysChannel; + + private final IOManager ioManager; + + private final MemoryManager memManager; + + private final List<MemorySegment> memory; + + private TypeSerializerFactory<Tuple> keySerializerFactory; + + private UnilateralSortMerger<Tuple> keySorter; + + private final AbstractInvokable memoryOwner; + + private long recordCounter; + + private int numKeyFields; + + private final int maxFilehandles; + + private volatile boolean closed; + + // -------------------------------------------------------------------------------------------- + + public LargeRecordHandler(TypeSerializer<T> serializer, TypeComparator<T> comparator, + IOManager ioManager, MemoryManager memManager, List<MemorySegment> memory, + AbstractInvokable memoryOwner, int maxFilehandles) + { + this.serializer = checkNotNull(serializer); + this.comparator = checkNotNull(comparator); + this.ioManager = checkNotNull(ioManager); + this.memManager = checkNotNull(memManager); + this.memory = checkNotNull(memory); + this.memoryOwner = checkNotNull(memoryOwner); + this.maxFilehandles = maxFilehandles; + + checkArgument(maxFilehandles >= 2); + } + + + // -------------------------------------------------------------------------------------------- + + @SuppressWarnings("unchecked") + public long addRecord(T record) throws IOException { + + if (recordsOutFile == null) { + + if (closed) { + throw new IllegalStateException("The large record handler has been closed."); + } + if (recordsReader != null) { + throw new IllegalStateException("The handler has already switched to sorting."); + } + + LOG.debug("Initializing the large record spilling..."); + + // initialize the utilities + { + final TypeComparator<?>[] keyComps = comparator.getFlatComparators(); + numKeyFields = keyComps.length; + Object[] keyHolder = new Object[numKeyFields]; + + comparator.extractKeys(record, keyHolder, 0); + + TypeSerializer<?>[] keySers = new TypeSerializer<?>[numKeyFields]; + TypeSerializer<?>[] tupleSers = new TypeSerializer<?>[numKeyFields + 1]; + + int[] keyPos = new int[numKeyFields]; + + for (int i = 0; i < numKeyFields; i++) { + keyPos[i] = i; + keySers[i] = createSerializer(keyHolder[i], i); + tupleSers[i] = keySers[i]; + } + // add the long serializer for the offset + tupleSers[numKeyFields] = LongSerializer.INSTANCE; + + keySerializer = new TupleSerializer<Tuple>((Class<Tuple>) Tuple.getTupleClass(numKeyFields+1), tupleSers); + keyComparator = new TupleComparator<Tuple>(keyPos, keyComps, keySers); + + // create the serializer factory for the tuple serializer + if (keySerializer.isStateful()) { + ClassLoader cl = getClassLoader(tupleSers); + + RuntimeStatefulSerializerFactory<Tuple> factory = + new RuntimeStatefulSerializerFactory<Tuple>(keySerializer, keySerializer.getTupleClass()); + factory.setClassLoader(cl); + keySerializerFactory = factory; + } + else { + keySerializerFactory = new RuntimeStatelessSerializerFactory<Tuple>(keySerializer, keySerializer.getTupleClass()); + } + + keyTuple = keySerializer.createInstance(); + } + + // initialize the spilling + final int totalNumSegments = memory.size(); + final int segmentsForKeys = (totalNumSegments >= 2*MAX_SEGMENTS_FOR_KEY_SPILLING) ? MAX_SEGMENTS_FOR_KEY_SPILLING : + Math.max(MIN_SEGMENTS_FOR_KEY_SPILLING, totalNumSegments - MAX_SEGMENTS_FOR_KEY_SPILLING); + + List<MemorySegment> recordsMemory = new ArrayList<MemorySegment>(); + List<MemorySegment> keysMemory = new ArrayList<MemorySegment>(); + + for (int i = 0; i < segmentsForKeys; i++) { + keysMemory.add(memory.get(i)); + } + for (int i = segmentsForKeys; i < totalNumSegments; i++) { + recordsMemory.add(memory.get(i)); + } + + recordsChannel = ioManager.createChannel(); + keysChannel = ioManager.createChannel(); + + recordsOutFile = new FileChannelOutputView( + ioManager.createBlockChannelWriter(recordsChannel), memManager, + recordsMemory, memManager.getPageSize()); + + keysOutFile = new FileChannelOutputView( + ioManager.createBlockChannelWriter(keysChannel), memManager, + keysMemory, memManager.getPageSize()); + } + + final long offset = recordsOutFile.getWriteOffset(); + if (offset < 0) { + throw new RuntimeException("wrong offset"); + } + + Object[] keyHolder = new Object[numKeyFields]; + + comparator.extractKeys(record, keyHolder, 0); + for (int i = 0; i < numKeyFields; i++) { + keyTuple.setField(keyHolder[i], i); + } + keyTuple.setField(offset, numKeyFields); + + keySerializer.serialize(keyTuple, keysOutFile); + serializer.serialize(record, recordsOutFile); + + recordCounter++; + + return offset; + } + + public MutableObjectIterator<T> finishWriteAndSortKeys(List<MemorySegment> memory) throws IOException { + if (recordsOutFile == null || keysOutFile == null) { + throw new IllegalStateException("The LargeRecordHandler has not spilled any records"); + } + + // close the writers and + final int lastBlockBytesKeys; + final int lastBlockBytesRecords; + + recordsOutFile.close(); + keysOutFile.close(); + lastBlockBytesKeys = keysOutFile.getBytesInLatestSegment(); + lastBlockBytesRecords = recordsOutFile.getBytesInLatestSegment(); + recordsOutFile = null; + keysOutFile = null; + + final int pagesForReaders = Math.max(3*MIN_SEGMENTS_FOR_KEY_SPILLING, Math.min(2*MAX_SEGMENTS_FOR_KEY_SPILLING, memory.size() / 50)); + final int pagesForKeyReader = Math.min(pagesForReaders - MIN_SEGMENTS_FOR_KEY_SPILLING, MAX_SEGMENTS_FOR_KEY_SPILLING); + final int pagesForRecordReader = pagesForReaders - pagesForKeyReader; + + // grab memory for the record reader + ArrayList<MemorySegment> memForRecordReader = new ArrayList<MemorySegment>(); + ArrayList<MemorySegment> memForKeysReader = new ArrayList<MemorySegment>(); + + for (int i = 0; i < pagesForRecordReader; i++) { + memForRecordReader.add(memory.remove(memory.size() - 1)); + } + for (int i = 0; i < pagesForKeyReader; i++) { + memForKeysReader.add(memory.remove(memory.size() - 1)); + } + + keysReader = new FileChannelInputView(ioManager.createBlockChannelReader(keysChannel), + memManager, memForKeysReader, lastBlockBytesKeys); + InputViewIterator<Tuple> keyIterator = new InputViewIterator<Tuple>(keysReader, keySerializer); + + keySorter = new UnilateralSortMerger<Tuple>(memManager, memory, ioManager, + keyIterator, memoryOwner, keySerializerFactory, keyComparator, 1, maxFilehandles, 1.0f, false); + + // wait for the sorter to sort the keys + MutableObjectIterator<Tuple> result; + try { + result = keySorter.getIterator(); + } catch (InterruptedException e) { + throw new IOException(e); + } + + recordsReader = new SeekableFileChannelInputView(ioManager, recordsChannel, memManager, memForRecordReader, lastBlockBytesRecords); + + return new FetchingIterator<T>(serializer, result, recordsReader, keySerializer, numKeyFields); + } + + /** + * Closes all structures and deletes all temporary files. + * Even in the presence of failures, this method will try and continue closing + * files and deleting temporary files. + * + * @throws IOException Thrown if an error occurred while closing/deleting the files. + */ + public void close() throws IOException { + + // we go on closing and deleting files in the presence of failures. + // we remember the first exception to occur and re-throw it later + Throwable ex = null; + + synchronized (this) { + + if (closed) { + return; + } + closed = true; + + // close the writers + if (recordsOutFile != null) { + try { + recordsOutFile.close(); + recordsOutFile = null; + } catch (Throwable t) { + LOG.error("Cannot close the large records spill file.", t); + ex = ex == null ? t : ex; + } + } + if (keysOutFile != null) { + try { + keysOutFile.close(); + keysOutFile = null; + } catch (Throwable t) { + LOG.error("Cannot close the large records key spill file.", t); + ex = ex == null ? t : ex; + } + } + + // close the readers + if (recordsReader != null) { + try { + recordsReader.close(); + recordsReader = null; + } catch (Throwable t) { + LOG.error("Cannot close the large records reader.", t); + ex = ex == null ? t : ex; + } + } + if (keysReader != null) { + try { + keysReader.close(); + keysReader = null; + } catch (Throwable t) { + LOG.error("Cannot close the large records key reader.", t); + ex = ex == null ? t : ex; + } + } + + // delete the spill files + if (recordsChannel != null) { + try { + ioManager.deleteChannel(recordsChannel); + recordsChannel = null; + } catch (Throwable t) { + LOG.error("Cannot delete the large records spill file.", t); + ex = ex == null ? t : ex; + } + } + if (keysChannel != null) { + try { + ioManager.deleteChannel(keysChannel); + keysChannel = null; + } catch (Throwable t) { + LOG.error("Cannot delete the large records key spill file.", t); + ex = ex == null ? t : ex; + } + } + + // close the key sorter + if (keySorter != null) { + try { + keySorter.close(); + keySorter = null; + } catch (Throwable t) { + LOG.error("Cannot properly dispose the key sorter and clean up its temporary files.", t); + ex = ex == null ? t : ex; + } + } + + memManager.release(memory); + + recordCounter = 0; + } + + // re-throw the exception, if necessary + if (ex != null) { + throw new IOException("An error occurred cleaning up spill files in the large record handler.", ex); + } + } + + // -------------------------------------------------------------------------------------------- + + public boolean hasData() { + return recordCounter > 0; + } + + // -------------------------------------------------------------------------------------------- + + private static TypeSerializer<Object> createSerializer(Object key, int pos) { + if (key == null) { + throw new NullKeyFieldException(pos); + } + try { + TypeInformation<Object> info = TypeExtractor.getForObject(key); + return info.createSerializer(); + } + catch (Throwable t) { + throw new RuntimeException("Could not create key serializer for type " + key); + } + } + + private static ClassLoader getClassLoader(Object[] objects) { + final ClassLoader appCl = LargeRecordHandler.class.getClassLoader(); + + for (Object o : objects) { + if (o != null && o.getClass().getClassLoader() != appCl) { + return o.getClass().getClassLoader(); + } + } + + return appCl; + } + + private static final class FetchingIterator<T> implements MutableObjectIterator<T> { + + private final TypeSerializer<T> serializer; + + private final MutableObjectIterator<Tuple> tupleInput; + + private final SeekableFileChannelInputView recordsInputs; + + private Tuple value; + + private final int pointerPos; + + + public FetchingIterator(TypeSerializer<T> serializer, MutableObjectIterator<Tuple> tupleInput, + SeekableFileChannelInputView recordsInputs, TypeSerializer<Tuple> tupleSerializer, int pointerPos) { + this.serializer = serializer; + this.tupleInput = tupleInput; + this.recordsInputs = recordsInputs; + this.pointerPos = pointerPos; + + this.value = tupleSerializer.createInstance(); + } + + @Override + public T next(T reuse) throws IOException { + Tuple value = tupleInput.next(this.value); + if (value != null) { + this.value = value; + long pointer = value.<Long>getField(pointerPos); + + recordsInputs.seek(pointer); + return serializer.deserialize(recordsInputs); + } else { + return null; + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/48256560/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/SortBuffer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/SortBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/SortBuffer.java new file mode 100644 index 0000000..be8511b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/SortBuffer.java @@ -0,0 +1,510 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.io.disk.SimpleCollectingOutputView; +import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView; +import org.apache.flink.runtime.memorymanager.ListMemorySegmentSource; +import org.apache.flink.util.MutableObjectIterator; + +public class SortBuffer<T> implements InMemorySorter<T> { + + private static final int OFFSET_LEN = 8; + + private static final int DEFAULT_MAX_NORMALIZED_KEY_LEN = 16; + + private static final int MAX_NORMALIZED_KEY_LEN_PER_ELEMENT = 8; + + private static final int MIN_REQUIRED_BUFFERS = 3; + + // ------------------------------------------------------------------------ + // Members + // ------------------------------------------------------------------------ + + private final byte[] swapBuffer; + + private final TypeSerializer<T> serializer; + + private final TypeComparator<T> comparator; + + private final SimpleCollectingOutputView recordCollector; + + private final RandomAccessInputView recordBuffer; + + private final RandomAccessInputView recordBufferForComparison; + + private MemorySegment currentSortIndexSegment; + + private final ArrayList<MemorySegment> freeMemory; + + private final ArrayList<MemorySegment> sortIndex; + + private final ArrayList<MemorySegment> recordBufferSegments; + + private long currentDataBufferOffset; + + private long sortIndexBytes; + + private int currentSortIndexOffset; + + private int numRecords; + + private final int numKeyBytes; + + private final int indexEntrySize; + + private final int indexEntriesPerSegment; + + private final int lastIndexEntryOffset; + + private final int segmentSize; + + private final int totalNumBuffers; + + private final boolean normalizedKeyFullyDetermines; + + private final boolean useNormKeyUninverted; + + + // ------------------------------------------------------------------------- + // Constructors / Destructors + // ------------------------------------------------------------------------- + + public SortBuffer(TypeSerializer<T> serializer, TypeComparator<T> comparator, List<MemorySegment> memory) { + this(serializer, comparator, memory, DEFAULT_MAX_NORMALIZED_KEY_LEN); + } + + public SortBuffer(TypeSerializer<T> serializer, TypeComparator<T> comparator, + List<MemorySegment> memory, int maxNormalizedKeyBytes) + { + if (serializer == null || comparator == null || memory == null) { + throw new NullPointerException(); + } + if (maxNormalizedKeyBytes < 0) { + throw new IllegalArgumentException("Maximal number of normalized key bytes must not be negative."); + } + + this.serializer = serializer; + this.comparator = comparator; + this.useNormKeyUninverted = !comparator.invertNormalizedKey(); + + // check the size of the first buffer and record it. all further buffers must have the same size. + // the size must also be a power of 2 + this.totalNumBuffers = memory.size(); + if (this.totalNumBuffers < MIN_REQUIRED_BUFFERS) { + throw new IllegalArgumentException("Normalized-Key sorter requires at least " + MIN_REQUIRED_BUFFERS + " memory buffers."); + } + this.segmentSize = memory.get(0).size(); + + if (memory instanceof ArrayList<?>) { + this.freeMemory = (ArrayList<MemorySegment>) memory; + } + else { + this.freeMemory = new ArrayList<MemorySegment>(memory.size()); + this.freeMemory.addAll(memory); + } + + // create the buffer collections + this.sortIndex = new ArrayList<MemorySegment>(16); + this.recordBufferSegments = new ArrayList<MemorySegment>(16); + + // the views for the record collections + this.recordCollector = new SimpleCollectingOutputView(this.recordBufferSegments, + new ListMemorySegmentSource(this.freeMemory), this.segmentSize); + this.recordBuffer = new RandomAccessInputView(this.recordBufferSegments, this.segmentSize); + this.recordBufferForComparison = new RandomAccessInputView(this.recordBufferSegments, this.segmentSize); + + // set up normalized key characteristics + if (this.comparator.supportsNormalizedKey()) { + // compute the max normalized key length + int numPartialKeys; + try { + numPartialKeys = this.comparator.getFlatComparators().length; + } catch (Throwable t) { + numPartialKeys = 1; + } + + int maxLen = Math.min(maxNormalizedKeyBytes, MAX_NORMALIZED_KEY_LEN_PER_ELEMENT * numPartialKeys); + + this.numKeyBytes = Math.min(this.comparator.getNormalizeKeyLen(), maxLen); + this.normalizedKeyFullyDetermines = !this.comparator.isNormalizedKeyPrefixOnly(this.numKeyBytes); + } + else { + this.numKeyBytes = 0; + this.normalizedKeyFullyDetermines = false; + } + + // compute the index entry size and limits + this.indexEntrySize = this.numKeyBytes + OFFSET_LEN; + this.indexEntriesPerSegment = segmentSize / this.indexEntrySize; + this.lastIndexEntryOffset = (this.indexEntriesPerSegment - 1) * this.indexEntrySize; + this.swapBuffer = new byte[this.indexEntrySize]; + + // set to initial state + this.currentSortIndexSegment = nextMemorySegment(); + this.sortIndex.add(this.currentSortIndexSegment); + } + + // ------------------------------------------------------------------------- + // Memory Segment + // ------------------------------------------------------------------------- + + /** + * Resets the sort buffer back to the state where it is empty. All contained data is discarded. + */ + @Override + public void reset() { + // reset all offsets + this.numRecords = 0; + this.currentSortIndexOffset = 0; + this.currentDataBufferOffset = 0; + this.sortIndexBytes = 0; + + // return all memory + this.freeMemory.addAll(this.sortIndex); + this.freeMemory.addAll(this.recordBufferSegments); + this.sortIndex.clear(); + this.recordBufferSegments.clear(); + + // grab first buffers + this.currentSortIndexSegment = nextMemorySegment(); + this.sortIndex.add(this.currentSortIndexSegment); + this.recordCollector.reset(); + } + + /** + * Checks whether the buffer is empty. + * + * @return True, if no record is contained, false otherwise. + */ + @Override + public boolean isEmpty() { + return this.numRecords == 0; + } + + /** + * Collects all memory segments from this sorter. + * + * @return All memory segments from this sorter. + */ + @Override + public List<MemorySegment> dispose() { + this.freeMemory.addAll(this.sortIndex); + this.freeMemory.addAll(this.recordBufferSegments); + + this.recordBufferSegments.clear(); + this.sortIndex.clear(); + + return this.freeMemory; + } + + /** + * Gets the total capacity of this sorter, in bytes. + * + * @return The sorter's total capacity. + */ + @Override + public long getCapacity() { + return ((long) this.totalNumBuffers) * this.segmentSize; + } + + /** + * Gets the number of bytes currently occupied in this sorter. + * + * @return The number of bytes occupied. + */ + @Override + public long getOccupancy() { + return this.currentDataBufferOffset + this.sortIndexBytes; + } + + // ------------------------------------------------------------------------- + // Retrieving and Writing + // ------------------------------------------------------------------------- + + /** + * Gets the record at the given logical position. + * + * @param reuse The target object to deserialize the record into. + * @param logicalPosition The logical position of the record. + * @throws IOException Thrown, if an exception occurred during deserialization. + */ + @Override + public T getRecord(T reuse, int logicalPosition) throws IOException { + return getRecordFromBuffer(reuse, readPointer(logicalPosition)); + } + + /** + * Writes a given record to this sort buffer. The written record will be appended and take + * the last logical position. + * + * @param record The record to be written. + * @return True, if the record was successfully written, false, if the sort buffer was full. + * @throws IOException Thrown, if an error occurred while serializing the record into the buffers. + */ + @Override + public boolean write(T record) throws IOException { + //check whether we need a new memory segment for the sort index + if (this.currentSortIndexOffset > this.lastIndexEntryOffset) { + if (memoryAvailable()) { + this.currentSortIndexSegment = nextMemorySegment(); + this.sortIndex.add(this.currentSortIndexSegment); + this.currentSortIndexOffset = 0; + this.sortIndexBytes += this.segmentSize; + } else { + return false; + } + } + + // add the pointer and the normalized key + this.currentSortIndexSegment.putLong(this.currentSortIndexOffset, this.currentDataBufferOffset); + if(this.numKeyBytes != 0) { + this.comparator.putNormalizedKey(record, this.currentSortIndexSegment, this.currentSortIndexOffset + OFFSET_LEN, this.numKeyBytes); + } + + // serialize the record into the data buffers + try { + this.serializer.serialize(record, this.recordCollector); + this.currentSortIndexOffset += this.indexEntrySize; + this.currentDataBufferOffset = this.recordCollector.getCurrentOffset(); + this.numRecords++; + return true; + } catch (EOFException eofex) { + return false; + } + } + + // ------------------------------------------------------------------------ + // Access Utilities + // ------------------------------------------------------------------------ + + private final long readPointer(int logicalPosition) { + if (logicalPosition < 0 | logicalPosition >= this.numRecords) { + throw new IndexOutOfBoundsException(); + } + + final int bufferNum = logicalPosition / this.indexEntriesPerSegment; + final int segmentOffset = logicalPosition % this.indexEntriesPerSegment; + + return this.sortIndex.get(bufferNum).getLong(segmentOffset * this.indexEntrySize); + } + + private final T getRecordFromBuffer(T reuse, long pointer) throws IOException { + this.recordBuffer.setReadPosition(pointer); + return this.serializer.deserialize(reuse, this.recordBuffer); + } + + private final int compareRecords(long pointer1, long pointer2) { + this.recordBuffer.setReadPosition(pointer1); + this.recordBufferForComparison.setReadPosition(pointer2); + + try { + return this.comparator.compareSerialized(this.recordBuffer, this.recordBufferForComparison); + } catch (IOException ioex) { + throw new RuntimeException("Error comparing two records.", ioex); + } + } + + private final boolean memoryAvailable() { + return !this.freeMemory.isEmpty(); + } + + private final MemorySegment nextMemorySegment() { + return this.freeMemory.remove(this.freeMemory.size() - 1); + } + + // ------------------------------------------------------------------------- + // Indexed Sorting + // ------------------------------------------------------------------------- + + @Override + public int compare(int i, int j) { + final int bufferNumI = i / this.indexEntriesPerSegment; + final int segmentOffsetI = (i % this.indexEntriesPerSegment) * this.indexEntrySize; + + final int bufferNumJ = j / this.indexEntriesPerSegment; + final int segmentOffsetJ = (j % this.indexEntriesPerSegment) * this.indexEntrySize; + + final MemorySegment segI = this.sortIndex.get(bufferNumI); + final MemorySegment segJ = this.sortIndex.get(bufferNumJ); + + int val = MemorySegment.compare(segI, segJ, segmentOffsetI + OFFSET_LEN, segmentOffsetJ + OFFSET_LEN, this.numKeyBytes); + + if (val != 0 || this.normalizedKeyFullyDetermines) { + return this.useNormKeyUninverted ? val : -val; + } + + final long pointerI = segI.getLong(segmentOffsetI); + final long pointerJ = segJ.getLong(segmentOffsetJ); + + return compareRecords(pointerI, pointerJ); + } + + @Override + public void swap(int i, int j) { + final int bufferNumI = i / this.indexEntriesPerSegment; + final int segmentOffsetI = (i % this.indexEntriesPerSegment) * this.indexEntrySize; + + final int bufferNumJ = j / this.indexEntriesPerSegment; + final int segmentOffsetJ = (j % this.indexEntriesPerSegment) * this.indexEntrySize; + + final MemorySegment segI = this.sortIndex.get(bufferNumI); + final MemorySegment segJ = this.sortIndex.get(bufferNumJ); + + MemorySegment.swapBytes(segI, segJ, this.swapBuffer, segmentOffsetI, segmentOffsetJ, this.indexEntrySize); + } + + @Override + public int size() { + return this.numRecords; + } + + // ------------------------------------------------------------------------- + + /** + * Gets an iterator over all records in this buffer in their logical order. + * + * @return An iterator returning the records in their logical order. + */ + @Override + public final MutableObjectIterator<T> getIterator() { + return new MutableObjectIterator<T>() + { + private final int size = size(); + private int current = 0; + + private int currentSegment = 0; + private int currentOffset = 0; + + private MemorySegment currentIndexSegment = sortIndex.get(0); + + @Override + public T next(T target) + { + if (this.current < this.size) { + this.current++; + if (this.currentOffset > lastIndexEntryOffset) { + this.currentOffset = 0; + this.currentIndexSegment = sortIndex.get(++this.currentSegment); + } + + long pointer = this.currentIndexSegment.getLong(this.currentOffset); + this.currentOffset += indexEntrySize; + + try { + return getRecordFromBuffer(target, pointer); + } + catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + else { + return null; + } + } + }; + } + + // ------------------------------------------------------------------------ + // Writing to a DataOutputView + // ------------------------------------------------------------------------ + + /** + * Writes the records in this buffer in their logical order to the given output. + * + * @param output The output view to write the records to. + * @throws IOException Thrown, if an I/O exception occurred writing to the output view. + */ + @Override + public void writeToOutput(final ChannelWriterOutputView output) throws IOException { + int recordsLeft = this.numRecords; + int currentMemSeg = 0; + while (recordsLeft > 0) + { + final MemorySegment currentIndexSegment = this.sortIndex.get(currentMemSeg++); + int offset = 0; + // check whether we have a full or partially full segment + if (recordsLeft >= this.indexEntriesPerSegment) { + // full segment + for (;offset <= this.lastIndexEntryOffset; offset += this.indexEntrySize) { + final long pointer = currentIndexSegment.getLong(offset); + this.recordBuffer.setReadPosition(pointer); + this.serializer.copy(this.recordBuffer, output); + + } + recordsLeft -= this.indexEntriesPerSegment; + } else { + // partially filled segment + for (; recordsLeft > 0; recordsLeft--, offset += this.indexEntrySize) + { + final long pointer = currentIndexSegment.getLong(offset); + this.recordBuffer.setReadPosition(pointer); + this.serializer.copy(this.recordBuffer, output); + } + } + } + } + + /** + * Writes a subset of the records in this buffer in their logical order to the given output. + * + * @param output The output view to write the records to. + * @param start The logical start position of the subset. + * @param num The number of elements to write. + * @throws IOException Thrown, if an I/O exception occurred writing to the output view. + */ + @Override + public void writeToOutput(final ChannelWriterOutputView output, final int start, int num) throws IOException { + int currentMemSeg = start / this.indexEntriesPerSegment; + int offset = (start % this.indexEntriesPerSegment) * this.indexEntrySize; + + while (num > 0) + { + final MemorySegment currentIndexSegment = this.sortIndex.get(currentMemSeg++); + // check whether we have a full or partially full segment + if (num >= this.indexEntriesPerSegment && offset == 0) { + // full segment + for (;offset <= this.lastIndexEntryOffset; offset += this.indexEntrySize) { + final long pointer = currentIndexSegment.getLong(offset); + this.recordBuffer.setReadPosition(pointer); + this.serializer.copy(this.recordBuffer, output); + } + num -= this.indexEntriesPerSegment; + } else { + // partially filled segment + for (; num > 0 && offset <= this.lastIndexEntryOffset; num--, offset += this.indexEntrySize) + { + final long pointer = currentIndexSegment.getLong(offset); + this.recordBuffer.setReadPosition(pointer); + this.serializer.copy(this.recordBuffer, output); + } + } + offset = 0; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/48256560/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java index dabf9bd..51cc1cf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java @@ -22,6 +22,7 @@ import java.io.File; import java.io.IOException; import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -63,68 +64,46 @@ public class UnilateralSortMerger<E> implements Sorter<E> { // Constants // ------------------------------------------------------------------------ - /** - * Logging. - */ + /** Logging. */ private static final Logger LOG = LoggerFactory.getLogger(UnilateralSortMerger.class); - /** - * Fix length records with a length below this threshold will be in-place sorted, if possible. - */ + /** Fix length records with a length below this threshold will be in-place sorted, if possible. */ private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32; - /** - * The minimal number of buffers to use by the writers. - */ + /** The minimal number of buffers to use by the writers. */ protected static final int MIN_NUM_WRITE_BUFFERS = 2; - /** - * The maximal number of buffers to use by the writers. - */ - protected static final int MAX_NUM_WRITE_BUFFERS = 64; + /** The maximal number of buffers to use by the writers. */ + protected static final int MAX_NUM_WRITE_BUFFERS = 4; - /** - * The minimum number of segments that are required for the sort to operate. - */ + /** The minimum number of segments that are required for the sort to operate. */ protected static final int MIN_NUM_SORT_MEM_SEGMENTS = 10; // ------------------------------------------------------------------------ // Threads // ------------------------------------------------------------------------ - /** - * The thread that reads the input channels into buffers and passes them on to the merger. - */ + /** The thread that reads the input channels into buffers and passes them on to the merger. */ private final ThreadBase<E> readThread; - /** - * The thread that merges the buffer handed from the reading thread. - */ + /** The thread that merges the buffer handed from the reading thread. */ private final ThreadBase<E> sortThread; - /** - * The thread that handles spilling to secondary storage. - */ + /** The thread that handles spilling to secondary storage. */ private final ThreadBase<E> spillThread; // ------------------------------------------------------------------------ // Memory // ------------------------------------------------------------------------ - /** - * The memory segments used first for sorting and later for reading/pre-fetching - * during the external merge. - */ - protected final ArrayList<MemorySegment> sortReadMemory; + /** The memory segments used first for sorting and later for reading/pre-fetching + * during the external merge. */ + protected final List<MemorySegment> sortReadMemory; - /** - * The memory segments used to stage data to be written. - */ - protected final ArrayList<MemorySegment> writeMemory; + /** The memory segments used to stage data to be written. */ + protected final List<MemorySegment> writeMemory; - /** - * The memory manager through which memory is allocated and released. - */ + /** The memory manager through which memory is allocated and released. */ protected final MemoryManager memoryManager; // ------------------------------------------------------------------------ @@ -132,6 +111,12 @@ public class UnilateralSortMerger<E> implements Sorter<E> { // ------------------------------------------------------------------------ /** + * The handler for large records, that do not go though the in-memory sorter as a whole, but + * directly go to disk. + */ + private final LargeRecordHandler<E> largeRecordHandler; + + /** * Collection of all currently open channels, to be closed and deleted during cleanup. */ private final HashSet<FileIOChannel> openChannels; @@ -166,26 +151,6 @@ public class UnilateralSortMerger<E> implements Sorter<E> { // Constructor & Shutdown // ------------------------------------------------------------------------ - /** - * Creates a new sorter that reads the data from a given reader and provides an iterator returning that - * data in a sorted manner. The memory is divided among sort buffers, write buffers and read buffers - * automatically. - * - * @param memoryManager The memory manager from which to allocate the memory. - * @param ioManager The I/O manager, which is used to write temporary files to disk. - * @param input The input that is sorted by this sorter. - * @param parentTask The parent task, which owns all resources used by this sorter. - * @param serializerFactory The type serializer. - * @param comparator The type comparator establishing the order relation. - * @param memoryFraction The fraction of memory dedicated to sorting, merging and I/O. - * @param maxNumFileHandles The maximum number of files to be merged at once. - * @param startSpillingFraction The faction of the buffers that have to be filled before the spilling thread - * actually begins spilling data to disk. - * - * @throws IOException Thrown, if an error occurs initializing the resources for external sorting. - * @throws MemoryAllocationException Thrown, if not enough memory can be obtained from the memory manager to - * perform the sort. - */ public UnilateralSortMerger(MemoryManager memoryManager, IOManager ioManager, MutableObjectIterator<E> input, AbstractInvokable parentTask, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, @@ -196,27 +161,6 @@ public class UnilateralSortMerger<E> implements Sorter<E> { memoryFraction, -1, maxNumFileHandles, startSpillingFraction); } - /** - * Creates a new sorter that reads the data from a given reader and provides an iterator returning that - * data in a sorted manner. The memory is divided among sort buffers, write buffers and read buffers - * automatically. - * - * @param memoryManager The memory manager from which to allocate the memory. - * @param ioManager The I/O manager, which is used to write temporary files to disk. - * @param input The input that is sorted by this sorter. - * @param parentTask The parent task, which owns all resources used by this sorter. - * @param serializerFactory The type serializer. - * @param comparator The type comparator establishing the order relation. - * @param memoryFraction The fraction of memory dedicated to sorting, merging and I/O. - * @param numSortBuffers The number of distinct buffers to use creation of the initial runs. - * @param maxNumFileHandles The maximum number of files to be merged at once. - * @param startSpillingFraction The faction of the buffers that have to be filled before the spilling thread - * actually begins spilling data to disk. - * - * @throws IOException Thrown, if an error occurs initializing the resources for external sorting. - * @throws MemoryAllocationException Thrown, if not enough memory can be obtained from the memory manager to - * perform the sort. - */ public UnilateralSortMerger(MemoryManager memoryManager, IOManager ioManager, MutableObjectIterator<E> input, AbstractInvokable parentTask, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, @@ -225,37 +169,42 @@ public class UnilateralSortMerger<E> implements Sorter<E> { throws IOException, MemoryAllocationException { this(memoryManager, ioManager, input, parentTask, serializerFactory, comparator, - memoryFraction, numSortBuffers, maxNumFileHandles, startSpillingFraction, false); + memoryFraction, numSortBuffers, maxNumFileHandles, startSpillingFraction, false, true); } - /** - * Internal constructor and constructor for subclasses that want to circumvent the spilling. - * - * @param memoryManager The memory manager from which to allocate the memory. - * @param ioManager The I/O manager, which is used to write temporary files to disk. - * @param input The input that is sorted by this sorter. - * @param parentTask The parent task, which owns all resources used by this sorter. - * @param serializerFactory The type serializer. - * @param comparator The type comparator establishing the order relation. - * @param memoryFraction The fraction of memory dedicated to sorting, merging and I/O. - * @param numSortBuffers The number of distinct buffers to use creation of the initial runs. - * @param maxNumFileHandles The maximum number of files to be merged at once. - * @param startSpillingFraction The faction of the buffers that have to be filled before the spilling thread - * actually begins spilling data to disk. - * @param noSpillingMemory When set to true, no memory will be allocated for writing and no spilling thread - * will be spawned. - * - * @throws IOException Thrown, if an error occurs initializing the resources for external sorting. - * @throws MemoryAllocationException Thrown, if not enough memory can be obtained from the memory manager to - * perform the sort. - */ - protected UnilateralSortMerger(MemoryManager memoryManager, IOManager ioManager, + public UnilateralSortMerger(MemoryManager memoryManager, List<MemorySegment> memory, + IOManager ioManager, + MutableObjectIterator<E> input, AbstractInvokable parentTask, + TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, + int numSortBuffers, int maxNumFileHandles, + float startSpillingFraction, boolean handleLargeRecords) + throws IOException + { + this(memoryManager, memory, ioManager, input, parentTask, serializerFactory, comparator, + numSortBuffers, maxNumFileHandles, startSpillingFraction, false, handleLargeRecords); + } + + protected UnilateralSortMerger(MemoryManager memoryManager, + IOManager ioManager, MutableObjectIterator<E> input, AbstractInvokable parentTask, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, double memoryFraction, int numSortBuffers, int maxNumFileHandles, - float startSpillingFraction, boolean noSpillingMemory) + float startSpillingFraction, boolean noSpillingMemory, boolean handleLargeRecords) throws IOException, MemoryAllocationException { + this(memoryManager, memoryManager.allocatePages(parentTask, memoryManager.computeNumberOfPages(memoryFraction)), + ioManager, input, parentTask, serializerFactory, comparator, + numSortBuffers, maxNumFileHandles, startSpillingFraction, noSpillingMemory, true); + } + + protected UnilateralSortMerger(MemoryManager memoryManager, List<MemorySegment> memory, + IOManager ioManager, + MutableObjectIterator<E> input, AbstractInvokable parentTask, + TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, + int numSortBuffers, int maxNumFileHandles, + float startSpillingFraction, boolean noSpillingMemory, boolean handleLargeRecords) + throws IOException + { // sanity checks if (memoryManager == null | (ioManager == null && !noSpillingMemory) | serializerFactory == null | comparator == null) { throw new NullPointerException(); @@ -270,7 +219,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> { this.memoryManager = memoryManager; // adjust the memory quotas to the page size - final int numPagesTotal = memoryManager.computeNumberOfPages(memoryFraction); + final int numPagesTotal = memory.size(); if (numPagesTotal < MIN_NUM_WRITE_BUFFERS + MIN_NUM_SORT_MEM_SEGMENTS) { throw new IllegalArgumentException("Too little memory provided to sorter to perform task. " + @@ -280,43 +229,52 @@ public class UnilateralSortMerger<E> implements Sorter<E> { // determine how many buffers to use for writing final int numWriteBuffers; - if (noSpillingMemory) { + final int numLargeRecordBuffers; + + if (noSpillingMemory && !handleLargeRecords) { numWriteBuffers = 0; - } else { - // determine how many buffers we have when we do a full mere with maximal fan-in - final int minBuffers = MIN_NUM_WRITE_BUFFERS + maxNumFileHandles; - final int desiredBuffers = MIN_NUM_WRITE_BUFFERS + 2 * maxNumFileHandles; + numLargeRecordBuffers = 0; + } + else { + int numConsumers = (noSpillingMemory ? 0 : 1) + (handleLargeRecords ? 2 : 0); - if (desiredBuffers > numPagesTotal) { - numWriteBuffers = MIN_NUM_WRITE_BUFFERS; - if (minBuffers > numPagesTotal) { - maxNumFileHandles = numPagesTotal - MIN_NUM_WRITE_BUFFERS; - if (LOG.isDebugEnabled()) { - LOG.debug("Reducing maximal merge fan-in to " + maxNumFileHandles + " due to limited memory availability during merge"); - } + // determine how many buffers we have when we do a full mere with maximal fan-in + final int minBuffersForMerging = maxNumFileHandles + numConsumers * MIN_NUM_WRITE_BUFFERS; + + if (minBuffersForMerging > numPagesTotal) { + numWriteBuffers = noSpillingMemory ? 0 : MIN_NUM_WRITE_BUFFERS; + numLargeRecordBuffers = handleLargeRecords ? 2*MIN_NUM_WRITE_BUFFERS : 0; + + maxNumFileHandles = numPagesTotal - numConsumers * MIN_NUM_WRITE_BUFFERS; + if (LOG.isDebugEnabled()) { + LOG.debug("Reducing maximal merge fan-in to " + maxNumFileHandles + " due to limited memory availability during merge"); } } else { // we are free to choose. make sure that we do not eat up too much memory for writing - final int designatedWriteBuffers = numPagesTotal / (maxNumFileHandles + 1); - final int fractional = numPagesTotal / 64; - final int maximal = numPagesTotal - MIN_NUM_SORT_MEM_SEGMENTS; + final int fractionalAuxBuffers = numPagesTotal / (numConsumers * 100); - numWriteBuffers = Math.max(MIN_NUM_WRITE_BUFFERS, // at least the lower bound - Math.min(Math.min(MAX_NUM_WRITE_BUFFERS, maximal), // at most the lower of the upper bounds - Math.min(designatedWriteBuffers, fractional))); // the lower of the average + if (fractionalAuxBuffers >= MAX_NUM_WRITE_BUFFERS) { + numWriteBuffers = noSpillingMemory ? 0 : MAX_NUM_WRITE_BUFFERS; + numLargeRecordBuffers = handleLargeRecords ? 2*MAX_NUM_WRITE_BUFFERS : 0; + } + else { + numWriteBuffers = noSpillingMemory ? 0 : + Math.max(MIN_NUM_WRITE_BUFFERS, fractionalAuxBuffers); // at least the lower bound + + numLargeRecordBuffers = handleLargeRecords ? + Math.max(2*MIN_NUM_WRITE_BUFFERS, fractionalAuxBuffers) // at least the lower bound + : 0; + } } } - final int sortMemPages = numPagesTotal - numWriteBuffers; + final int sortMemPages = numPagesTotal - numWriteBuffers - numLargeRecordBuffers; final long sortMemory = ((long) sortMemPages) * memoryManager.getPageSize(); // decide how many sort buffers to use if (numSortBuffers < 1) { - if (sortMemory > 96 * 1024 * 1024) { - numSortBuffers = 3; - } - else if (sortMemPages >= 2 * MIN_NUM_SORT_MEM_SEGMENTS) { + if (sortMemory > 100 * 1024 * 1024) { numSortBuffers = 2; } else { @@ -326,27 +284,42 @@ public class UnilateralSortMerger<E> implements Sorter<E> { final int numSegmentsPerSortBuffer = sortMemPages / numSortBuffers; if (LOG.isDebugEnabled()) { - LOG.debug("Instantiating sorter with " + sortMemPages + " pages of sorting memory (=" + - sortMemory + " bytes total) divided over " + numSortBuffers + " sort buffers (" + - numSegmentsPerSortBuffer + " pages per buffer). Using " + numWriteBuffers + - " buffers for writing sorted results and merging maximally " + maxNumFileHandles + - " streams at once."); + LOG.debug(String.format("Instantiating sorter with %d pages of sorting memory (=" + + "%d bytes total) divided over %d sort buffers (%d pages per buffer). Using %d" + + " buffers for writing sorted results and merging maximally %d streams at once. " + + "Using %d memory segments for large record spilling.", + sortMemPages, sortMemory, numSortBuffers, numSegmentsPerSortBuffer, numWriteBuffers, + maxNumFileHandles, numLargeRecordBuffers)); } + + this.sortReadMemory = memory; this.writeMemory = new ArrayList<MemorySegment>(numWriteBuffers); - this.sortReadMemory = new ArrayList<MemorySegment>(sortMemPages); - // allocate the memory - memoryManager.allocatePages(parentTask, this.sortReadMemory, sortMemPages); + final TypeSerializer<E> serializer = serializerFactory.getSerializer(); + + // move some pages from the sort memory to the write memory if (numWriteBuffers > 0) { - memoryManager.allocatePages(parentTask, this.writeMemory, numWriteBuffers); + for (int i = 0; i < numWriteBuffers; i++) { + this.writeMemory.add(this.sortReadMemory.remove(this.sortReadMemory.size() - 1)); + } + } + if (numLargeRecordBuffers > 0) { + List<MemorySegment> mem = new ArrayList<MemorySegment>(); + for (int i = 0; i < numLargeRecordBuffers; i++) { + mem.add(this.sortReadMemory.remove(this.sortReadMemory.size() - 1)); + } + + this.largeRecordHandler = new LargeRecordHandler<E>(serializer, comparator.duplicate(), + ioManager, memoryManager, mem, parentTask, maxNumFileHandles); + } + else { + this.largeRecordHandler = null; } // circular queues pass buffers between the threads final CircularQueues<E> circularQueues = new CircularQueues<E>(); - final TypeSerializer<E> serializer = serializerFactory.getSerializer(); - // allocate the sort buffers and fill empty queue with them final Iterator<MemorySegment> segments = this.sortReadMemory.iterator(); for (int i = 0; i < numSortBuffers; i++) @@ -390,8 +363,8 @@ public class UnilateralSortMerger<E> implements Sorter<E> { this.openChannels = new HashSet<FileIOChannel>(64); // start the thread that reads the input channels - this.readThread = getReadingThread(exceptionHandler, input, circularQueues, parentTask, - serializer, ((long) (startSpillingFraction * sortMemory))); + this.readThread = getReadingThread(exceptionHandler, input, circularQueues, largeRecordHandler, + parentTask, serializer, ((long) (startSpillingFraction * sortMemory))); // start the thread that sorts the buffers this.sortThread = getSortingThread(exceptionHandler, circularQueues, parentTask); @@ -543,6 +516,12 @@ public class UnilateralSortMerger<E> implements Sorter<E> { } catch (Throwable t) {} } + + try { + if (this.largeRecordHandler != null) { + this.largeRecordHandler.close(); + } + } catch (Throwable t) {} } } @@ -574,11 +553,12 @@ public class UnilateralSortMerger<E> implements Sorter<E> { * them into a queue. */ protected ThreadBase<E> getReadingThread(ExceptionHandler<IOException> exceptionHandler, - MutableObjectIterator<E> reader, CircularQueues<E> queues, AbstractInvokable parentTask, + MutableObjectIterator<E> reader, CircularQueues<E> queues, + LargeRecordHandler<E> largeRecordHandler, AbstractInvokable parentTask, TypeSerializer<E> serializer, long startSpillingBytes) { - return new ReadingThread<E>(exceptionHandler, reader, queues, serializer.createInstance(), - parentTask, startSpillingBytes); + return new ReadingThread<E>(exceptionHandler, reader, queues, largeRecordHandler, + serializer.createInstance(),parentTask, startSpillingBytes); } protected ThreadBase<E> getSortingThread(ExceptionHandler<IOException> exceptionHandler, CircularQueues<E> queues, @@ -853,19 +833,15 @@ public class UnilateralSortMerger<E> implements Sorter<E> { */ protected static class ReadingThread<E> extends ThreadBase<E> { - /** - * The input channels to read from. - */ + /** The input channels to read from. */ private final MutableObjectIterator<E> reader; - /** - * The fraction of the buffers that must be full before the spilling starts. - */ + private final LargeRecordHandler<E> largeRecords; + + /** The fraction of the buffers that must be full before the spilling starts. */ private final long startSpillingBytes; - /** - * The object into which the thread reads the data from the input. - */ + /** The object into which the thread reads the data from the input. */ private final E readTarget; /** @@ -878,7 +854,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> { */ public ReadingThread(ExceptionHandler<IOException> exceptionHandler, MutableObjectIterator<E> reader, CircularQueues<E> queues, - E readTarget, + LargeRecordHandler<E> largeRecordsHandler, E readTarget, AbstractInvokable parentTask, long startSpillingBytes) { super(exceptionHandler, "SortMerger Reading Thread", queues, parentTask); @@ -887,14 +863,15 @@ public class UnilateralSortMerger<E> implements Sorter<E> { this.reader = reader; this.readTarget = readTarget; this.startSpillingBytes = startSpillingBytes; + this.largeRecords = largeRecordsHandler; } /** * The entry point for the thread. Gets a buffer for all threads and then loops as long as there is input * available. */ - public void go() throws IOException - { + public void go() throws IOException { + final MutableObjectIterator<E> reader = this.reader; E current = this.readTarget; @@ -921,12 +898,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> { element = this.queues.empty.take(); } catch (InterruptedException iex) { - if (isRunning()) { - LOG.error("Reading thread was interrupted (without being shut down) while grabbing a buffer. " + - "Retrying to grab buffer..."); - } else { - return; - } + throw new IOException(iex); } } @@ -943,7 +915,13 @@ public class UnilateralSortMerger<E> implements Sorter<E> { // write the last leftover pair, if we have one if (leftoverRecord != null) { if (!buffer.write(leftoverRecord)) { - throw new IOException("Record could not be written to empty buffer: Serialized record exceeds buffer capacity."); + if (this.largeRecords != null) { + this.largeRecords.addRecord(leftoverRecord); + } else { + throw new IOException("The record exceeds the maximum size of a sort buffer (current maximum: " + + buffer.getCapacity() + " bytes)."); + } + buffer.reset(); } leftoverRecord = null; } @@ -1045,6 +1023,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> { this.queues.sort.add(element); } else { + buffer.reset(); this.queues.empty.add(element); } element = null; @@ -1088,8 +1067,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> { /** * Entry point of the thread. */ - public void go() throws IOException - { + public void go() throws IOException { boolean alive = true; // loop as long as the thread is marked alive @@ -1113,10 +1091,17 @@ public class UnilateralSortMerger<E> implements Sorter<E> { } if (element != EOF_MARKER && element != SPILLING_MARKER) { + + if (element.buffer.size() == 0) { + element.buffer.reset(); + this.queues.empty.add(element); + continue; + } + if (LOG.isDebugEnabled()) { LOG.debug("Sorting buffer " + element.id + "."); } - + this.sorter.sort(element.buffer); if (LOG.isDebugEnabled()) { @@ -1150,9 +1135,9 @@ public class UnilateralSortMerger<E> implements Sorter<E> { protected final List<MemorySegment> writeMemory; // memory segments for writing - protected final List<MemorySegment> sortReadMemory; // memory segments for sorting/reading + protected final List<MemorySegment> mergeReadMemory; // memory segments for sorting/reading - protected final int maxNumFileHandles; + protected final int maxFanIn; protected final int numWriteBuffersToCluster; @@ -1180,9 +1165,9 @@ public class UnilateralSortMerger<E> implements Sorter<E> { this.ioManager = ioManager; this.serializer = serializer; this.comparator = comparator; - this.sortReadMemory = sortReadMemory; + this.mergeReadMemory = sortReadMemory; this.writeMemory = writeMemory; - this.maxNumFileHandles = maxNumFileHandles; + this.maxFanIn = maxNumFileHandles; this.numWriteBuffersToCluster = writeMemory.size() >= 4 ? writeMemory.size() / 2 : 1; } @@ -1203,14 +1188,9 @@ public class UnilateralSortMerger<E> implements Sorter<E> { element = this.queues.spill.take(); } catch (InterruptedException iex) { - if (isRunning()) { - LOG.error("Sorting thread was interrupted (without being shut down) while grabbing a buffer. " + - "Retrying to grab buffer..."); - continue; - } else { - return; - } + throw new IOException("The spilling thread was interrupted."); } + if (element == SPILLING_MARKER) { break; } @@ -1226,21 +1206,46 @@ public class UnilateralSortMerger<E> implements Sorter<E> { return; } + MutableObjectIterator<E> largeRecords = null; + + // check if we can stay in memory with the large record handler + if (cacheOnly && largeRecordHandler != null && largeRecordHandler.hasData()) { + List<MemorySegment> memoryForLargeRecordSorting = new ArrayList<MemorySegment>(); + + CircularElement<E> circElement; + while ((circElement = this.queues.empty.poll()) != null) { + memoryForLargeRecordSorting.addAll(circElement.buffer.dispose()); + } + + if (memoryForLargeRecordSorting.isEmpty()) { + cacheOnly = false; + LOG.debug("Going to disk-based merge because of large records."); + + } else { + LOG.debug("Sorting large records, to add them to in-memory merge."); + largeRecords = largeRecordHandler.finishWriteAndSortKeys(memoryForLargeRecordSorting); + } + } + // ------------------- In-Memory Merge ------------------------ if (cacheOnly) { - /* operates on in-memory segments only */ + // operates on in-memory buffers only if (LOG.isDebugEnabled()) { LOG.debug("Initiating in memory merge."); } - List<MutableObjectIterator<E>> iterators = new ArrayList<MutableObjectIterator<E>>(cache.size()); - + List<MutableObjectIterator<E>> iterators = new ArrayList<MutableObjectIterator<E>>(cache.size() + 1); + // iterate buffers and collect a set of iterators for (CircularElement<E> cached : cache) { // note: the yielded iterator only operates on the buffer heap (and disregards the stack) iterators.add(cached.buffer.getIterator()); } + if (largeRecords != null) { + iterators.add(largeRecords); + } + // release the remaining sort-buffers if (LOG.isDebugEnabled()) { LOG.debug("Releasing unused sort-buffer memory."); @@ -1252,7 +1257,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> { iterators.size() == 1 ? iterators.get(0) : new MergeIterator<E>(iterators, this.comparator)); return; - } + } // ------------------- Spilling Phase ------------------------ @@ -1261,7 +1266,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> { // loop as long as the thread is marked alive and we do not see the final element - while (isRunning()) { + while (isRunning()) { try { element = takeNext(this.queues.spill, cache); } @@ -1322,11 +1327,50 @@ public class UnilateralSortMerger<E> implements Sorter<E> { // clear the sort buffers, but do not return the memory to the manager, as we use it for merging disposeSortBuffers(false); + // ------------------- Merging Phase ------------------------ + // make sure we have enough memory to merge and for large record handling + List<MemorySegment> mergeReadMemory; + + if (largeRecordHandler != null && largeRecordHandler.hasData()) { + + List<MemorySegment> longRecMem; + if (channelIDs.isEmpty()) { + // only long records + longRecMem = this.mergeReadMemory; + mergeReadMemory = Collections.emptyList(); + } + else { + int maxMergedStreams = Math.min(this.maxFanIn, channelIDs.size()); + + int pagesPerStream = Math.max(MIN_NUM_WRITE_BUFFERS, + Math.min(MAX_NUM_WRITE_BUFFERS, this.mergeReadMemory.size() / 2 / maxMergedStreams)); + + int totalMergeReadMemory = maxMergedStreams * pagesPerStream; + + // grab the merge memory + mergeReadMemory = new ArrayList<MemorySegment>(totalMergeReadMemory); + for (int i = 0; i < totalMergeReadMemory; i++) { + mergeReadMemory.add(this.mergeReadMemory.get(i)); + } + + // the remainder of the memory goes to the long record sorter + longRecMem = new ArrayList<MemorySegment>(); + for (int i = totalMergeReadMemory; i < this.mergeReadMemory.size(); i++) { + longRecMem.add(this.mergeReadMemory.get(i)); + } + } + + largeRecords = largeRecordHandler.finishWriteAndSortKeys(longRecMem); + } + else { + mergeReadMemory = this.mergeReadMemory; + } + // merge channels until sufficient file handles are available - while (isRunning() && channelIDs.size() > this.maxNumFileHandles) { - channelIDs = mergeChannelList(channelIDs, this.sortReadMemory, this.writeMemory); + while (isRunning() && channelIDs.size() > this.maxFanIn) { + channelIDs = mergeChannelList(channelIDs, mergeReadMemory, this.writeMemory); } // from here on, we won't write again @@ -1335,7 +1379,11 @@ public class UnilateralSortMerger<E> implements Sorter<E> { // check if we have spilled some data at all if (channelIDs.isEmpty()) { - setResultIterator(EmptyMutableObjectIterator.<E>get()); + if (largeRecords == null) { + setResultIterator(EmptyMutableObjectIterator.<E>get()); + } else { + setResultIterator(largeRecords); + } } else { if (LOG.isDebugEnabled()) { @@ -1346,10 +1394,10 @@ public class UnilateralSortMerger<E> implements Sorter<E> { List<List<MemorySegment>> readBuffers = new ArrayList<List<MemorySegment>>(channelIDs.size()); // allocate the read memory and register it to be released - getSegmentsForReaders(readBuffers, this.sortReadMemory, channelIDs.size()); + getSegmentsForReaders(readBuffers, mergeReadMemory, channelIDs.size()); // get the readers and register them to be released - setResultIterator(getMergingIterator(channelIDs, readBuffers, new ArrayList<FileIOChannel>(channelIDs.size()))); + setResultIterator(getMergingIterator(channelIDs, readBuffers, new ArrayList<FileIOChannel>(channelIDs.size()), largeRecords)); } // done @@ -1361,8 +1409,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> { /** * Releases the memory that is registered for in-memory sorted run generation. */ - protected final void disposeSortBuffers(boolean releaseMemory) - { + protected final void disposeSortBuffers(boolean releaseMemory) { while (!this.queues.empty.isEmpty()) { try { final InMemorySorter<?> sorter = this.queues.empty.take().buffer; @@ -1403,7 +1450,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> { * @throws IOException Thrown, if the readers encounter an I/O problem. */ protected final MergeIterator<E> getMergingIterator(final List<ChannelWithBlockCount> channelIDs, - final List<List<MemorySegment>> inputSegments, List<FileIOChannel> readerList) + final List<List<MemorySegment>> inputSegments, List<FileIOChannel> readerList, MutableObjectIterator<E> largeRecords) throws IOException { // create one iterator per channel id @@ -1411,7 +1458,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> { LOG.debug("Performing merge of " + channelIDs.size() + " sorted streams."); } - final List<MutableObjectIterator<E>> iterators = new ArrayList<MutableObjectIterator<E>>(channelIDs.size()); + final List<MutableObjectIterator<E>> iterators = new ArrayList<MutableObjectIterator<E>>(channelIDs.size() + 1); for (int i = 0; i < channelIDs.size(); i++) { final ChannelWithBlockCount channel = channelIDs.get(i); @@ -1429,6 +1476,10 @@ public class UnilateralSortMerger<E> implements Sorter<E> { channel.getBlockCount(), false); iterators.add(new ChannelReaderInputViewIterator<E>(inView, null, this.serializer)); } + + if (largeRecords != null) { + iterators.add(largeRecords); + } return new MergeIterator<E>(iterators, this.comparator); } @@ -1446,7 +1497,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> { final List<MemorySegment> allReadBuffers, final List<MemorySegment> writeBuffers) throws IOException { - final double numMerges = Math.ceil(channelIDs.size() / ((double) this.maxNumFileHandles)); + final double numMerges = Math.ceil(channelIDs.size() / ((double) this.maxFanIn)); final int channelsToMergePerStep = (int) Math.ceil(channelIDs.size() / numMerges); // allocate the memory for the merging step @@ -1494,7 +1545,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> { final List<FileIOChannel> channelAccesses = new ArrayList<FileIOChannel>(channelIDs.size()); // the list with the target iterators - final MergeIterator<E> mergeIterator = getMergingIterator(channelIDs, readBuffers, channelAccesses); + final MergeIterator<E> mergeIterator = getMergingIterator(channelIDs, readBuffers, channelAccesses, null); // create a new channel writer final FileIOChannel.ID mergedChannelID = this.ioManager.createChannel();
