Repository: flink Updated Branches: refs/heads/master 631b6eb80 -> 9cdd2b3c5
[Runtime] Fix unnecessary object creation for large record sorter This addresses comments made in https://github.com/apache/flink/commit/7df6a3d7266b0f934b76722732176dbf5469bdb4 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9cdd2b3c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9cdd2b3c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9cdd2b3c Branch: refs/heads/master Commit: 9cdd2b3c56f4b9aba4fade43cc90f39e31742e33 Parents: 631b6eb Author: Ufuk Celebi <[email protected]> Authored: Thu Jan 22 12:24:19 2015 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Thu Jan 22 12:26:36 2015 +0100 ---------------------------------------------------------------------- .../operators/sort/LargeRecordHandler.java | 12 +- .../sort/ExternalSortLargeRecordsITCase.java | 174 +++++++++---------- 2 files changed, 91 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9cdd2b3c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java index e1be59a..f494ca7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java @@ -436,21 +436,21 @@ public class LargeRecordHandler<T> { @Override public T next(T reuse) throws IOException { + return next(); + } + + @Override + public T next() throws IOException { Tuple value = tupleInput.next(this.value); if (value != null) { this.value = value; long pointer = value.<Long>getField(pointerPos); - + recordsInputs.seek(pointer); return serializer.deserialize(recordsInputs); } else { return null; } } - - @Override - public T next() throws IOException { - return next(serializer.createInstance()); - } } } http://git-wip-us.apache.org/repos/asf/flink/blob/9cdd2b3c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java index 38442c4..6a0c5bf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java @@ -98,28 +98,26 @@ public class ExternalSortLargeRecordsITCase { new TupleTypeInfo<Tuple2<Long,SomeMaybeLongValue>>(types); final TypeSerializer<Tuple2<Long, SomeMaybeLongValue>> serializer = typeInfo.createSerializer(); final TypeComparator<Tuple2<Long, SomeMaybeLongValue>> comparator = typeInfo.createComparator(new int[] {0}, new boolean[]{false}, 0); - - MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> source = - new MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>>() - { - private final Random rnd = new Random(); - private int num = 0; - - @Override - public Tuple2<Long, SomeMaybeLongValue> next(Tuple2<Long, SomeMaybeLongValue> reuse) { - if (num++ < NUM_RECORDS) { - long val = rnd.nextLong(); - return new Tuple2<Long, SomeMaybeLongValue>(val, new SomeMaybeLongValue((int) val)); - } - else { - return null; - } - - } + + MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> source = + new MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>>() { + private final Random rnd = new Random(); + private int num = 0; @Override - public Tuple2<Long, SomeMaybeLongValue> next() throws IOException { - return next(new Tuple2<Long, SomeMaybeLongValue>()); + public Tuple2<Long, SomeMaybeLongValue> next(Tuple2<Long, SomeMaybeLongValue> reuse) { + return next(); + } + + @Override + public Tuple2<Long, SomeMaybeLongValue> next() { + if (num++ < NUM_RECORDS) { + long val = rnd.nextLong(); + return new Tuple2<Long, SomeMaybeLongValue>(val, new SomeMaybeLongValue((int) val)); + } + else { + return null; + } } }; @@ -169,28 +167,26 @@ public class ExternalSortLargeRecordsITCase { new TupleTypeInfo<Tuple2<Long,SomeMaybeLongValue>>(types); final TypeSerializer<Tuple2<Long, SomeMaybeLongValue>> serializer = typeInfo.createSerializer(); final TypeComparator<Tuple2<Long, SomeMaybeLongValue>> comparator = typeInfo.createComparator(new int[] {0}, new boolean[]{false}, 0); - - MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> source = - new MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>>() - { - private final Random rnd = new Random(); - private int num = -1; - - @Override - public Tuple2<Long, SomeMaybeLongValue> next(Tuple2<Long, SomeMaybeLongValue> reuse) { - if (++num < NUM_RECORDS) { - long val = rnd.nextLong(); - return new Tuple2<Long, SomeMaybeLongValue>(val, new SomeMaybeLongValue((int) val, num % LARGE_REC_INTERVAL == 0)); - } - else { - return null; - } - - } + + MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> source = + new MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>>() { + private final Random rnd = new Random(); + private int num = -1; + + @Override + public Tuple2<Long, SomeMaybeLongValue> next(Tuple2<Long, SomeMaybeLongValue> reuse) { + return next(); + } @Override - public Tuple2<Long, SomeMaybeLongValue> next() throws IOException { - return new Tuple2<Long, SomeMaybeLongValue>(); + public Tuple2<Long, SomeMaybeLongValue> next() { + if (++num < NUM_RECORDS) { + long val = rnd.nextLong(); + return new Tuple2<Long, SomeMaybeLongValue>(val, new SomeMaybeLongValue((int) val, num % LARGE_REC_INTERVAL == 0)); + } + else { + return null; + } } }; @@ -242,38 +238,39 @@ public class ExternalSortLargeRecordsITCase { final TypeSerializer<Tuple2<Long, SmallOrMediumOrLargeValue>> serializer = typeInfo.createSerializer(); final TypeComparator<Tuple2<Long, SmallOrMediumOrLargeValue>> comparator = typeInfo.createComparator(new int[] {0}, new boolean[]{false}, 0); - - MutableObjectIterator<Tuple2<Long, SmallOrMediumOrLargeValue>> source = - new MutableObjectIterator<Tuple2<Long, SmallOrMediumOrLargeValue>>() - { - private final Random rnd = new Random(); - private int num = -1; - - @Override - public Tuple2<Long, SmallOrMediumOrLargeValue> next(Tuple2<Long, SmallOrMediumOrLargeValue> reuse) { - if (++num < NUM_RECORDS) { - - int size; - if (num % LARGE_REC_INTERVAL == 0) { - size = SmallOrMediumOrLargeValue.LARGE_SIZE; - } else if (num % MEDIUM_REC_INTERVAL == 0) { - size = SmallOrMediumOrLargeValue.MEDIUM_SIZE; - } else { - size = SmallOrMediumOrLargeValue.SMALL_SIZE; + + MutableObjectIterator<Tuple2<Long, SmallOrMediumOrLargeValue>> source = + new MutableObjectIterator<Tuple2<Long, SmallOrMediumOrLargeValue>>() { + private final Random rnd = new Random(); + private int num = -1; + + @Override + public Tuple2<Long, SmallOrMediumOrLargeValue> next(Tuple2<Long, SmallOrMediumOrLargeValue> reuse) { + return next(); + } - - long val = rnd.nextLong(); - return new Tuple2<Long, SmallOrMediumOrLargeValue>(val, new SmallOrMediumOrLargeValue((int) val, size)); - } - else { - return null; - } - - } @Override - public Tuple2<Long, SmallOrMediumOrLargeValue> next() throws IOException { - return new Tuple2<Long, SmallOrMediumOrLargeValue>(); + public Tuple2<Long, SmallOrMediumOrLargeValue> next() { + if (++num < NUM_RECORDS) { + + int size; + if (num % LARGE_REC_INTERVAL == 0) { + size = SmallOrMediumOrLargeValue.LARGE_SIZE; + } + else if (num % MEDIUM_REC_INTERVAL == 0) { + size = SmallOrMediumOrLargeValue.MEDIUM_SIZE; + } + else { + size = SmallOrMediumOrLargeValue.SMALL_SIZE; + } + + long val = rnd.nextLong(); + return new Tuple2<Long, SmallOrMediumOrLargeValue>(val, new SmallOrMediumOrLargeValue((int) val, size)); + } + else { + return null; + } } }; @@ -323,28 +320,27 @@ public class ExternalSortLargeRecordsITCase { final TypeSerializer<Tuple2<Long, SmallOrMediumOrLargeValue>> serializer = typeInfo.createSerializer(); final TypeComparator<Tuple2<Long, SmallOrMediumOrLargeValue>> comparator = typeInfo.createComparator(new int[] {0}, new boolean[]{false}, 0); - - MutableObjectIterator<Tuple2<Long, SmallOrMediumOrLargeValue>> source = - new MutableObjectIterator<Tuple2<Long, SmallOrMediumOrLargeValue>>() - { - private final Random rnd = new Random(); - private int num = -1; - - @Override - public Tuple2<Long, SmallOrMediumOrLargeValue> next(Tuple2<Long, SmallOrMediumOrLargeValue> reuse) { - if (++num < NUM_RECORDS) { - long val = rnd.nextLong(); - return new Tuple2<Long, SmallOrMediumOrLargeValue>(val, new SmallOrMediumOrLargeValue((int) val, SmallOrMediumOrLargeValue.MEDIUM_SIZE)); - } - else { - return null; - } - - } + + MutableObjectIterator<Tuple2<Long, SmallOrMediumOrLargeValue>> source = + new MutableObjectIterator<Tuple2<Long, SmallOrMediumOrLargeValue>>() { + private final Random rnd = new Random(); + private int num = -1; + + @Override + public Tuple2<Long, SmallOrMediumOrLargeValue> next(Tuple2<Long, SmallOrMediumOrLargeValue> reuse) { + return next(); + + } @Override - public Tuple2<Long, SmallOrMediumOrLargeValue> next() throws IOException { - return new Tuple2<Long, SmallOrMediumOrLargeValue>(); + public Tuple2<Long, SmallOrMediumOrLargeValue> next() { + if (++num < NUM_RECORDS) { + long val = rnd.nextLong(); + return new Tuple2<Long, SmallOrMediumOrLargeValue>(val, new SmallOrMediumOrLargeValue((int) val, SmallOrMediumOrLargeValue.MEDIUM_SIZE)); + } + else { + return null; + } } };
