[FLINK-2653] [runtime] Enable object reuse in MergeIterator This closes #1115
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0a8df6d5 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0a8df6d5 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0a8df6d5 Branch: refs/heads/master Commit: 0a8df6d513fa59d650ff875bdf3a1613d0f14af5 Parents: 6891212 Author: Greg Hogan <[email protected]> Authored: Thu Sep 10 09:35:39 2015 -0400 Committer: Stephan Ewen <[email protected]> Committed: Tue Sep 29 12:21:34 2015 +0200 ---------------------------------------------------------------------- .../flink/runtime/operators/DataSinkTask.java | 3 +- .../runtime/operators/RegularPactTask.java | 4 +- .../sort/CombiningUnilateralSortMerger.java | 9 ++-- .../operators/sort/LargeRecordHandler.java | 3 +- .../runtime/operators/sort/MergeIterator.java | 47 +++++++++++++++++--- .../operators/sort/UnilateralSortMerger.java | 47 ++++++++++++++------ .../operators/ReduceTaskExternalITCase.java | 4 +- .../flink/runtime/operators/ReduceTaskTest.java | 2 +- .../CombiningUnilateralSortMergerITCase.java | 6 +-- .../operators/sort/ExternalSortITCase.java | 10 ++--- .../sort/ExternalSortLargeRecordsITCase.java | 8 ++-- .../testutils/BinaryOperatorTestBase.java | 3 +- .../operators/testutils/DriverTestBase.java | 2 +- .../testutils/UnaryOperatorTestBase.java | 2 +- .../operators/util/HashVsSortMiniBenchmark.java | 4 +- .../flink/tez/runtime/DataSinkProcessor.java | 2 +- .../org/apache/flink/tez/runtime/TezTask.java | 4 +- .../flink/test/manual/MassiveStringSorting.java | 4 +- .../test/manual/MassiveStringValueSorting.java | 4 +- .../manual/MassiveCaseClassSortingITCase.scala | 2 +- 20 files changed, 114 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java index 39a0a28..1002bae 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java @@ -164,7 +164,8 @@ public class DataSinkTask<IT> extends AbstractInvokable { getEnvironment().getIOManager(), this.reader, this, this.inputTypeSerializerFactory, compFact.createComparator(), this.config.getRelativeMemoryInput(0), this.config.getFilehandlesInput(0), - this.config.getSpillingThresholdInput(0)); + this.config.getSpillingThresholdInput(0), + this.getExecutionConfig().isObjectReuseEnabled()); this.localStrategy = sorter; input1 = sorter.getIterator(); http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java index 6d35f92..89963af 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java @@ -923,7 +923,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i UnilateralSortMerger<?> sorter = new UnilateralSortMerger(getMemoryManager(), getIOManager(), this.inputIterators[inputNum], this, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum), this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum), - this.config.getSpillingThresholdInput(inputNum)); + this.config.getSpillingThresholdInput(inputNum), this.getExecutionConfig().isObjectReuseEnabled()); // set the input to null such that it will be lazily fetched from the input strategy this.inputs[inputNum] = null; this.localStrategies[inputNum] = sorter; @@ -959,7 +959,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i (GroupCombineFunction) localStub, getMemoryManager(), getIOManager(), this.inputIterators[inputNum], this, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum), this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum), - this.config.getSpillingThresholdInput(inputNum)); + this.config.getSpillingThresholdInput(inputNum), this.getExecutionConfig().isObjectReuseEnabled()); cSorter.setUdfConfiguration(this.config.getStubParameters()); // set the input to null such that it will be lazily fetched from the input strategy http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java index f662a7e..855ee21 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java @@ -103,11 +103,11 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> { public CombiningUnilateralSortMerger(GroupCombineFunction<E, E> combineStub, MemoryManager memoryManager, IOManager ioManager, MutableObjectIterator<E> input, AbstractInvokable parentTask, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, - double memoryFraction, int maxNumFileHandles, float startSpillingFraction) + double memoryFraction, int maxNumFileHandles, float startSpillingFraction, boolean objectReuseEnabled) throws IOException, MemoryAllocationException { this(combineStub, memoryManager, ioManager, input, parentTask, serializerFactory, comparator, - memoryFraction, -1, maxNumFileHandles, startSpillingFraction); + memoryFraction, -1, maxNumFileHandles, startSpillingFraction, objectReuseEnabled); } /** @@ -136,11 +136,12 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> { MutableObjectIterator<E> input, AbstractInvokable parentTask, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, double memoryFraction, int numSortBuffers, int maxNumFileHandles, - float startSpillingFraction) + float startSpillingFraction, boolean objectReuseEnabled) throws IOException, MemoryAllocationException { super(memoryManager, ioManager, input, parentTask, serializerFactory, comparator, - memoryFraction, numSortBuffers, maxNumFileHandles, startSpillingFraction, false, true); + memoryFraction, numSortBuffers, maxNumFileHandles, startSpillingFraction, false, true, + objectReuseEnabled); this.combineStub = combineStub; } http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java index e4a99fb..518f44c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java @@ -254,7 +254,8 @@ public class LargeRecordHandler<T> { InputViewIterator<Tuple> keyIterator = new InputViewIterator<Tuple>(keysReader, keySerializer); keySorter = new UnilateralSortMerger<Tuple>(memManager, memory, ioManager, - keyIterator, memoryOwner, keySerializerFactory, keyComparator, 1, maxFilehandles, 1.0f, false); + keyIterator, memoryOwner, keySerializerFactory, keyComparator, 1, maxFilehandles, 1.0f, false, + this.executionConfig.isObjectReuseEnabled()); // wait for the sorter to sort the keys MutableObjectIterator<Tuple> result; http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java index 9da429d..0792dbf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java @@ -55,18 +55,43 @@ public class MergeIterator<E> implements MutableObjectIterator<E> { /** * Gets the next smallest element, with respect to the definition of order implied by - * the {@link TypeSerializer} provided to this iterator. This method does in fact not - * reuse the given element (which would here imply potentially expensive copying), - * but always returns a new element. + * the {@link TypeSerializer} provided to this iterator. * - * @param reuse Ignored. - * @return The next smallest element, or null, if the iterator is exhausted. + * @param reuse Object that may be reused. + * @return The next element if the iterator has another element, null otherwise. * * @see org.apache.flink.util.MutableObjectIterator#next(java.lang.Object) */ @Override public E next(E reuse) throws IOException { - return next(); + /* There are three ways to handle object reuse: + * 1) reuse and return the given object + * 2) ignore the given object and return a new object + * 3) exchange the given object for an existing object + * + * The first option is not available here as the return value has + * already been deserialized from the heap's top iterator. The second + * option avoids object reuse. The third option is implemented below + * by passing the given object to the heap's top iterator into which + * the next value will be deserialized. + */ + + if (this.heap.size() > 0) { + // get the smallest element + final HeadStream<E> top = this.heap.peek(); + E result = top.getHead(); + + // read an element + if (!top.nextHead(reuse)) { + this.heap.poll(); + } else { + this.heap.adjustTop(); + } + return result; + } + else { + return null; + } } /** @@ -122,6 +147,16 @@ public class MergeIterator<E> implements MutableObjectIterator<E> { return this.head; } + public boolean nextHead(E reuse) throws IOException { + if ((this.head = this.iterator.next(reuse)) != null) { + this.comparator.setReference(this.head); + return true; + } + else { + return false; + } + } + public boolean nextHead() throws IOException { if ((this.head = this.iterator.next()) != null) { this.comparator.setReference(this.head); http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java index 13159d9..0fa24f2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java @@ -146,6 +146,11 @@ public class UnilateralSortMerger<E> implements Sorter<E> { */ protected volatile boolean closed; + /** + * Whether to reuse objects during deserialization. + */ + protected final boolean objectReuseEnabled; + // ------------------------------------------------------------------------ // Constructor & Shutdown // ------------------------------------------------------------------------ @@ -153,22 +158,24 @@ public class UnilateralSortMerger<E> implements Sorter<E> { public UnilateralSortMerger(MemoryManager memoryManager, IOManager ioManager, MutableObjectIterator<E> input, AbstractInvokable parentTask, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, - double memoryFraction, int maxNumFileHandles, float startSpillingFraction) + double memoryFraction, int maxNumFileHandles, float startSpillingFraction, + boolean objectReuseEnabled) throws IOException, MemoryAllocationException { this(memoryManager, ioManager, input, parentTask, serializerFactory, comparator, - memoryFraction, -1, maxNumFileHandles, startSpillingFraction); + memoryFraction, -1, maxNumFileHandles, startSpillingFraction, objectReuseEnabled); } public UnilateralSortMerger(MemoryManager memoryManager, IOManager ioManager, MutableObjectIterator<E> input, AbstractInvokable parentTask, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, double memoryFraction, int numSortBuffers, int maxNumFileHandles, - float startSpillingFraction) + float startSpillingFraction, boolean objectReuseEnabled) throws IOException, MemoryAllocationException { this(memoryManager, ioManager, input, parentTask, serializerFactory, comparator, - memoryFraction, numSortBuffers, maxNumFileHandles, startSpillingFraction, false, true); + memoryFraction, numSortBuffers, maxNumFileHandles, startSpillingFraction, false, true, + objectReuseEnabled); } public UnilateralSortMerger(MemoryManager memoryManager, List<MemorySegment> memory, @@ -176,11 +183,12 @@ public class UnilateralSortMerger<E> implements Sorter<E> { MutableObjectIterator<E> input, AbstractInvokable parentTask, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, int numSortBuffers, int maxNumFileHandles, - float startSpillingFraction, boolean handleLargeRecords) + float startSpillingFraction, boolean handleLargeRecords, boolean objectReuseEnabled) throws IOException { this(memoryManager, memory, ioManager, input, parentTask, serializerFactory, comparator, - numSortBuffers, maxNumFileHandles, startSpillingFraction, false, handleLargeRecords); + numSortBuffers, maxNumFileHandles, startSpillingFraction, false, handleLargeRecords, + objectReuseEnabled); } protected UnilateralSortMerger(MemoryManager memoryManager, @@ -188,12 +196,14 @@ public class UnilateralSortMerger<E> implements Sorter<E> { MutableObjectIterator<E> input, AbstractInvokable parentTask, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, double memoryFraction, int numSortBuffers, int maxNumFileHandles, - float startSpillingFraction, boolean noSpillingMemory, boolean handleLargeRecords) + float startSpillingFraction, boolean noSpillingMemory, boolean handleLargeRecords, + boolean objectReuseEnabled) throws IOException, MemoryAllocationException { this(memoryManager, memoryManager.allocatePages(parentTask, memoryManager.computeNumberOfPages(memoryFraction)), ioManager, input, parentTask, serializerFactory, comparator, - numSortBuffers, maxNumFileHandles, startSpillingFraction, noSpillingMemory, true); + numSortBuffers, maxNumFileHandles, startSpillingFraction, noSpillingMemory, true, + objectReuseEnabled); } protected UnilateralSortMerger(MemoryManager memoryManager, List<MemorySegment> memory, @@ -201,7 +211,8 @@ public class UnilateralSortMerger<E> implements Sorter<E> { MutableObjectIterator<E> input, AbstractInvokable parentTask, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, int numSortBuffers, int maxNumFileHandles, - float startSpillingFraction, boolean noSpillingMemory, boolean handleLargeRecords) + float startSpillingFraction, boolean noSpillingMemory, boolean handleLargeRecords, + boolean objectReuseEnabled) throws IOException { // sanity checks @@ -216,7 +227,8 @@ public class UnilateralSortMerger<E> implements Sorter<E> { } this.memoryManager = memoryManager; - + this.objectReuseEnabled = objectReuseEnabled; + // adjust the memory quotas to the page size final int numPagesTotal = memory.size(); @@ -1595,10 +1607,17 @@ public class UnilateralSortMerger<E> implements Sorter<E> { this.memManager.getPageSize()); // read the merged stream and write the data back - final TypeSerializer<E> serializer = this.serializer; - E rec = serializer.createInstance(); - while ((rec = mergeIterator.next(rec)) != null) { - serializer.serialize(rec, output); + if (objectReuseEnabled) { + final TypeSerializer<E> serializer = this.serializer; + E rec = serializer.createInstance(); + while ((rec = mergeIterator.next(rec)) != null) { + serializer.serialize(rec, output); + } + } else { + E rec; + while ((rec = mergeIterator.next()) != null) { + serializer.serialize(rec, output); + } } output.close(); final int numBlocksWritten = output.getBlockCount(); http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java index d83e92e..f59c4a3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java @@ -134,7 +134,7 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc getMemoryManager(), getIOManager(), new UniformRecordGenerator(keyCnt, valCnt, false), getOwningNepheleTask(), RecordSerializerFactory.get(), this.comparator.duplicate(), this.perSortFractionMem, - 2, 0.8f); + 2, 0.8f, true); addInput(sorter.getIterator()); GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>(); @@ -180,7 +180,7 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc getMemoryManager(), getIOManager(), new UniformRecordGenerator(keyCnt, valCnt, false), getOwningNepheleTask(), RecordSerializerFactory.get(), this.comparator.duplicate(), this.perSortFractionMem, - 2, 0.8f); + 2, 0.8f, false); addInput(sorter.getIterator()); GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>(); http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java index 964f646..cc25c99 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java @@ -128,7 +128,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor sorter = new CombiningUnilateralSortMerger<Record>(new MockCombiningReduceStub(), getMemoryManager(), getIOManager(), new UniformRecordGenerator(keyCnt, valCnt, false), getOwningNepheleTask(), RecordSerializerFactory.get(), this.comparator.duplicate(), this.perSortFractionMem, - 4, 0.8f); + 4, 0.8f, true); addInput(sorter.getIterator()); GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>(); http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java index 75593b8..e1e2c0a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java @@ -115,7 +115,7 @@ public class CombiningUnilateralSortMergerITCase { Sorter<Record> merger = new CombiningUnilateralSortMerger<Record>(comb, this.memoryManager, this.ioManager, reader, this.parentTask, this.serializerFactory, this.comparator, - 0.25, 64, 0.7f); + 0.25, 64, 0.7f, false); final Record rec = new Record(); rec.setField(1, new IntValue(1)); @@ -156,7 +156,7 @@ public class CombiningUnilateralSortMergerITCase { Sorter<Record> merger = new CombiningUnilateralSortMerger<Record>(comb, this.memoryManager, this.ioManager, reader, this.parentTask, this.serializerFactory, this.comparator, - 0.01, 64, 0.005f); + 0.01, 64, 0.005f, true); final Record rec = new Record(); rec.setField(1, new IntValue(1)); @@ -205,7 +205,7 @@ public class CombiningUnilateralSortMergerITCase { Sorter<Record> merger = new CombiningUnilateralSortMerger<Record>(comb, this.memoryManager, this.ioManager, reader, this.parentTask, this.serializerFactory, this.comparator, - 0.25, 2, 0.7f); + 0.25, 2, 0.7f, false); // emit data LOG.debug("emitting data"); http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java index 5aa9efb..9f0b3d9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java @@ -119,7 +119,7 @@ public class ExternalSortITCase { Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager, this.ioManager, source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator, - (double)64/78, 2, 0.9f); + (double)64/78, 2, 0.9f, true); // emit data LOG.debug("Reading and sorting data..."); @@ -172,7 +172,7 @@ public class ExternalSortITCase { Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager, this.ioManager, source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator, - (double)64/78, 10, 2, 0.9f); + (double)64/78, 10, 2, 0.9f, false); // emit data LOG.debug("Reading and sorting data..."); @@ -225,7 +225,7 @@ public class ExternalSortITCase { Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager, this.ioManager, source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator, - (double)16/78, 64, 0.7f); + (double)16/78, 64, 0.7f, true); // emit data LOG.debug("Reading and sorting data..."); @@ -281,7 +281,7 @@ public class ExternalSortITCase { Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager, this.ioManager, source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator, - (double)64/78, 16, 0.7f); + (double)64/78, 16, 0.7f, false); // emit data LOG.debug("Emitting data..."); @@ -341,7 +341,7 @@ public class ExternalSortITCase { LOG.debug("Initializing sortmerger..."); Sorter<IntPair> merger = new UnilateralSortMerger<IntPair>(this.memoryManager, this.ioManager, - generator, this.parentTask, serializerFactory, comparator, (double)64/78, 4, 0.7f); + generator, this.parentTask, serializerFactory, comparator, (double)64/78, 4, 0.7f, true); // emit data LOG.debug("Emitting data..."); http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java index 951ce30..c806766 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java @@ -128,7 +128,7 @@ public class ExternalSortLargeRecordsITCase { this.memoryManager, this.ioManager, source, this.parentTask, new RuntimeSerializerFactory<Tuple2<Long, SomeMaybeLongValue>>(serializer, (Class<Tuple2<Long, SomeMaybeLongValue>>) (Class<?>) Tuple2.class), - comparator, 1.0, 1, 128, 0.7f); + comparator, 1.0, 1, 128, 0.7f, false); // check order MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> iterator = sorter.getIterator(); @@ -198,7 +198,7 @@ public class ExternalSortLargeRecordsITCase { this.memoryManager, this.ioManager, source, this.parentTask, new RuntimeSerializerFactory<Tuple2<Long, SomeMaybeLongValue>>(serializer, (Class<Tuple2<Long, SomeMaybeLongValue>>) (Class<?>) Tuple2.class), - comparator, 1.0, 1, 128, 0.7f); + comparator, 1.0, 1, 128, 0.7f, true); // check order MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> iterator = sorter.getIterator(); @@ -283,7 +283,7 @@ public class ExternalSortLargeRecordsITCase { this.memoryManager, this.ioManager, source, this.parentTask, new RuntimeSerializerFactory<Tuple2<Long, SmallOrMediumOrLargeValue>>(serializer, (Class<Tuple2<Long, SmallOrMediumOrLargeValue>>) (Class<?>) Tuple2.class), - comparator, 1.0, 1, 128, 0.7f); + comparator, 1.0, 1, 128, 0.7f, false); // check order MutableObjectIterator<Tuple2<Long, SmallOrMediumOrLargeValue>> iterator = sorter.getIterator(); @@ -354,7 +354,7 @@ public class ExternalSortLargeRecordsITCase { this.memoryManager, this.ioManager, source, this.parentTask, new RuntimeSerializerFactory<Tuple2<Long, SmallOrMediumOrLargeValue>>(serializer, (Class<Tuple2<Long, SmallOrMediumOrLargeValue>>) (Class<?>) Tuple2.class), - comparator, 1.0, 1, 128, 0.7f); + comparator, 1.0, 1, 128, 0.7f, true); // check order MutableObjectIterator<Tuple2<Long, SmallOrMediumOrLargeValue>> iterator = sorter.getIterator(); http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java index ece20ff..5136aea 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java @@ -147,7 +147,8 @@ public class BinaryOperatorTestBase<S extends Function, IN, OUT> extends TestLog comp, this.perSortFractionMem, 32, - 0.8f + 0.8f, + false ); this.sorters.add(sorter); this.inputs.add(null); http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java index 1737349..116fdec 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java @@ -142,7 +142,7 @@ public class DriverTestBase<S extends Function> extends TestLogger implements Pa public void addInputSorted(MutableObjectIterator<Record> input, RecordComparator comp) throws Exception { UnilateralSortMerger<Record> sorter = new UnilateralSortMerger<Record>( this.memManager, this.ioManager, input, this.owner, RecordSerializerFactory.get(), comp, - this.perSortFractionMem, 32, 0.8f); + this.perSortFractionMem, 32, 0.8f, true); this.sorters.add(sorter); this.inputs.add(null); } http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java index 924a16b..e2b2430 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java @@ -146,7 +146,7 @@ public class UnaryOperatorTestBase<S extends Function, IN, OUT> extends TestLogg this.memManager, this.ioManager, input, this.owner, this.<IN>getInputSerializer(0), comp, - this.perSortFractionMem, 32, 0.8f); + this.perSortFractionMem, 32, 0.8f, false); } public void addDriverComparator(TypeComparator<IN> comparator) { http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java index 1060e55..f112ff8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java @@ -133,11 +133,11 @@ public class HashVsSortMiniBenchmark { final UnilateralSortMerger<Record> sorter1 = new UnilateralSortMerger<Record>( this.memoryManager, this.ioManager, input1, this.parentTask, this.serializer1, - this.comparator1.duplicate(), MEMORY_FOR_SORTER, 128, 0.8f); + this.comparator1.duplicate(), MEMORY_FOR_SORTER, 128, 0.8f, true); final UnilateralSortMerger<Record> sorter2 = new UnilateralSortMerger<Record>( this.memoryManager, this.ioManager, input2, this.parentTask, this.serializer2, - this.comparator2.duplicate(), MEMORY_FOR_SORTER, 128, 0.8f); + this.comparator2.duplicate(), MEMORY_FOR_SORTER, 128, 0.8f, true); final MutableObjectIterator<Record> sortedInput1 = sorter1.getIterator(); final MutableObjectIterator<Record> sortedInput2 = sorter2.getIterator(); http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSinkProcessor.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSinkProcessor.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSinkProcessor.java index 01dbbc5..8011d21 100644 --- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSinkProcessor.java +++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSinkProcessor.java @@ -146,7 +146,7 @@ public class DataSinkProcessor<IT> extends AbstractLogicalIOProcessor { this.runtimeEnvironment.getIOManager(), this.reader, this.invokable, this.inputTypeSerializerFactory, compFact.createComparator(), this.config.getRelativeMemoryInput(0), this.config.getFilehandlesInput(0), - this.config.getSpillingThresholdInput(0)); + this.config.getSpillingThresholdInput(0), false); this.localStrategy = sorter; this.input = sorter.getIterator(); http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java index a745177..b7cbfb4 100644 --- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java +++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java @@ -378,7 +378,7 @@ public class TezTask<S extends Function,OT> implements PactTaskContext<S, OT> { UnilateralSortMerger<?> sorter = new UnilateralSortMerger(getMemoryManager(), getIOManager(), this.inputIterators[inputNum], this.invokable, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum), this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum), - this.config.getSpillingThresholdInput(inputNum)); + this.config.getSpillingThresholdInput(inputNum), this.executionConfig.isObjectReuseEnabled()); // set the input to null such that it will be lazily fetched from the input strategy this.inputs[inputNum] = null; this.localStrategies[inputNum] = sorter; @@ -414,7 +414,7 @@ public class TezTask<S extends Function,OT> implements PactTaskContext<S, OT> { (GroupCombineFunction) localStub, getMemoryManager(), getIOManager(), this.inputIterators[inputNum], this.invokable, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum), this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum), - this.config.getSpillingThresholdInput(inputNum)); + this.config.getSpillingThresholdInput(inputNum), this.executionConfig.isObjectReuseEnabled()); cSorter.setUdfConfiguration(this.config.getStubParameters()); // set the input to null such that it will be lazily fetched from the input strategy http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java index c11b93c..c9bd56b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java +++ b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java @@ -91,7 +91,7 @@ public class MassiveStringSorting { MutableObjectIterator<String> inputIterator = new StringReaderMutableObjectIterator(reader); sorter = new UnilateralSortMerger<String>(mm, ioMan, inputIterator, new DummyInvokable(), - new RuntimeSerializerFactory<String>(serializer, String.class), comparator, 1.0, 4, 0.8f); + new RuntimeSerializerFactory<String>(serializer, String.class), comparator, 1.0, 4, 0.8f, false); MutableObjectIterator<String> sortedData = sorter.getIterator(); @@ -184,7 +184,7 @@ public class MassiveStringSorting { MutableObjectIterator<Tuple2<String, String[]>> inputIterator = new StringTupleReaderMutableObjectIterator(reader); sorter = new UnilateralSortMerger<Tuple2<String, String[]>>(mm, ioMan, inputIterator, new DummyInvokable(), - new RuntimeSerializerFactory<Tuple2<String, String[]>>(serializer, (Class<Tuple2<String, String[]>>) (Class<?>) Tuple2.class), comparator, 1.0, 4, 0.8f); + new RuntimeSerializerFactory<Tuple2<String, String[]>>(serializer, (Class<Tuple2<String, String[]>>) (Class<?>) Tuple2.class), comparator, 1.0, 4, 0.8f, false); http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java index 7a484e7..9a016cc 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java +++ b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java @@ -91,7 +91,7 @@ public class MassiveStringValueSorting { MutableObjectIterator<StringValue> inputIterator = new StringValueReaderMutableObjectIterator(reader); sorter = new UnilateralSortMerger<StringValue>(mm, ioMan, inputIterator, new DummyInvokable(), - new RuntimeSerializerFactory<StringValue>(serializer, StringValue.class), comparator, 1.0, 4, 0.8f); + new RuntimeSerializerFactory<StringValue>(serializer, StringValue.class), comparator, 1.0, 4, 0.8f, true); MutableObjectIterator<StringValue> sortedData = sorter.getIterator(); @@ -187,7 +187,7 @@ public class MassiveStringValueSorting { MutableObjectIterator<Tuple2<StringValue, StringValue[]>> inputIterator = new StringValueTupleReaderMutableObjectIterator(reader); sorter = new UnilateralSortMerger<Tuple2<StringValue, StringValue[]>>(mm, ioMan, inputIterator, new DummyInvokable(), - new RuntimeSerializerFactory<Tuple2<StringValue, StringValue[]>>(serializer, (Class<Tuple2<StringValue, StringValue[]>>) (Class<?>) Tuple2.class), comparator, 1.0, 4, 0.8f); + new RuntimeSerializerFactory<Tuple2<StringValue, StringValue[]>>(serializer, (Class<Tuple2<StringValue, StringValue[]>>) (Class<?>) Tuple2.class), comparator, 1.0, 4, 0.8f, false); http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-tests/src/test/scala/org/apache/flink/api/scala/manual/MassiveCaseClassSortingITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/manual/MassiveCaseClassSortingITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/manual/MassiveCaseClassSortingITCase.scala index 7385fa2..a38a19b 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/manual/MassiveCaseClassSortingITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/manual/MassiveCaseClassSortingITCase.scala @@ -98,7 +98,7 @@ class MassiveCaseClassSortingITCase { sorter = new UnilateralSortMerger[StringTuple](mm, ioMan, inputIterator, new DummyInvokable(), new RuntimeSerializerFactory[StringTuple](serializer, classOf[StringTuple]), - comparator, 1.0, 4, 0.8f) + comparator, 1.0, 4, 0.8f, false) val sortedData = sorter.getIterator reader.close()
