Repository: flink
Updated Branches:
  refs/heads/master 631b6eb80 -> 9cdd2b3c5


[Runtime] Fix unnecessary object creation for large record sorter

This addresses comments made in 
https://github.com/apache/flink/commit/7df6a3d7266b0f934b76722732176dbf5469bdb4


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9cdd2b3c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9cdd2b3c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9cdd2b3c

Branch: refs/heads/master
Commit: 9cdd2b3c56f4b9aba4fade43cc90f39e31742e33
Parents: 631b6eb
Author: Ufuk Celebi <[email protected]>
Authored: Thu Jan 22 12:24:19 2015 +0100
Committer: Ufuk Celebi <[email protected]>
Committed: Thu Jan 22 12:26:36 2015 +0100

----------------------------------------------------------------------
 .../operators/sort/LargeRecordHandler.java      |  12 +-
 .../sort/ExternalSortLargeRecordsITCase.java    | 174 +++++++++----------
 2 files changed, 91 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9cdd2b3c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java
index e1be59a..f494ca7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java
@@ -436,21 +436,21 @@ public class LargeRecordHandler<T> {
 
                @Override
                public T next(T reuse) throws IOException {
+                       return next();
+               }
+
+               @Override
+               public T next() throws IOException {
                        Tuple value = tupleInput.next(this.value);
                        if (value != null) {
                                this.value = value;
                                long pointer = value.<Long>getField(pointerPos);
-                               
+
                                recordsInputs.seek(pointer);
                                return serializer.deserialize(recordsInputs);
                        } else {
                                return null;
                        }
                }
-
-               @Override
-               public T next() throws IOException {
-                       return next(serializer.createInstance());
-               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9cdd2b3c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
index 38442c4..6a0c5bf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
@@ -98,28 +98,26 @@ public class ExternalSortLargeRecordsITCase {
                                                                new 
TupleTypeInfo<Tuple2<Long,SomeMaybeLongValue>>(types);
                        final TypeSerializer<Tuple2<Long, SomeMaybeLongValue>> 
serializer = typeInfo.createSerializer();
                        final TypeComparator<Tuple2<Long, SomeMaybeLongValue>> 
comparator = typeInfo.createComparator(new int[] {0}, new boolean[]{false}, 0);
-                       
-                       MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> 
source = 
-                                       new MutableObjectIterator<Tuple2<Long, 
SomeMaybeLongValue>>()
-                       {
-                               private final Random rnd = new Random();
-                               private int num = 0;
-                               
-                               @Override
-                               public Tuple2<Long, SomeMaybeLongValue> 
next(Tuple2<Long, SomeMaybeLongValue> reuse) {
-                                       if (num++ < NUM_RECORDS) {
-                                               long val = rnd.nextLong();
-                                               return new Tuple2<Long, 
SomeMaybeLongValue>(val, new SomeMaybeLongValue((int) val));
-                                       }
-                                       else {
-                                               return null;
-                                       }
-                                       
-                               }
+
+                       MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> 
source =
+                                       new MutableObjectIterator<Tuple2<Long, 
SomeMaybeLongValue>>() {
+                                               private final Random rnd = new 
Random();
+                                               private int num = 0;
 
                                                @Override
-                                               public Tuple2<Long, 
SomeMaybeLongValue> next() throws IOException {
-                                                       return next(new 
Tuple2<Long, SomeMaybeLongValue>());
+                                               public Tuple2<Long, 
SomeMaybeLongValue> next(Tuple2<Long, SomeMaybeLongValue> reuse) {
+                                                       return next();
+                                               }
+
+                                               @Override
+                                               public Tuple2<Long, 
SomeMaybeLongValue> next() {
+                                                       if (num++ < 
NUM_RECORDS) {
+                                                               long val = 
rnd.nextLong();
+                                                               return new 
Tuple2<Long, SomeMaybeLongValue>(val, new SomeMaybeLongValue((int) val));
+                                                       }
+                                                       else {
+                                                               return null;
+                                                       }
                                                }
                                        };
                        
@@ -169,28 +167,26 @@ public class ExternalSortLargeRecordsITCase {
                                                                new 
TupleTypeInfo<Tuple2<Long,SomeMaybeLongValue>>(types);
                        final TypeSerializer<Tuple2<Long, SomeMaybeLongValue>> 
serializer = typeInfo.createSerializer();
                        final TypeComparator<Tuple2<Long, SomeMaybeLongValue>> 
comparator = typeInfo.createComparator(new int[] {0}, new boolean[]{false}, 0);
-                       
-                       MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> 
source = 
-                                       new MutableObjectIterator<Tuple2<Long, 
SomeMaybeLongValue>>()
-                       {
-                               private final Random rnd = new Random();
-                               private int num = -1;
-                               
-                               @Override
-                               public Tuple2<Long, SomeMaybeLongValue> 
next(Tuple2<Long, SomeMaybeLongValue> reuse) {
-                                       if (++num < NUM_RECORDS) {
-                                               long val = rnd.nextLong();
-                                               return new Tuple2<Long, 
SomeMaybeLongValue>(val, new SomeMaybeLongValue((int) val, num % 
LARGE_REC_INTERVAL == 0));
-                                       }
-                                       else {
-                                               return null;
-                                       }
-                                       
-                               }
+
+                       MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> 
source =
+                                       new MutableObjectIterator<Tuple2<Long, 
SomeMaybeLongValue>>() {
+                                               private final Random rnd = new 
Random();
+                                               private int num = -1;
+
+                                               @Override
+                                               public Tuple2<Long, 
SomeMaybeLongValue> next(Tuple2<Long, SomeMaybeLongValue> reuse) {
+                                                       return next();
+                                               }
 
                                                @Override
-                                               public Tuple2<Long, 
SomeMaybeLongValue> next() throws IOException {
-                                                       return new Tuple2<Long, 
SomeMaybeLongValue>();
+                                               public Tuple2<Long, 
SomeMaybeLongValue> next() {
+                                                       if (++num < 
NUM_RECORDS) {
+                                                               long val = 
rnd.nextLong();
+                                                               return new 
Tuple2<Long, SomeMaybeLongValue>(val, new SomeMaybeLongValue((int) val, num % 
LARGE_REC_INTERVAL == 0));
+                                                       }
+                                                       else {
+                                                               return null;
+                                                       }
                                                }
                                        };
                        
@@ -242,38 +238,39 @@ public class ExternalSortLargeRecordsITCase {
                        
                        final TypeSerializer<Tuple2<Long, 
SmallOrMediumOrLargeValue>> serializer = typeInfo.createSerializer();
                        final TypeComparator<Tuple2<Long, 
SmallOrMediumOrLargeValue>> comparator = typeInfo.createComparator(new int[] 
{0}, new boolean[]{false}, 0);
-                       
-                       MutableObjectIterator<Tuple2<Long, 
SmallOrMediumOrLargeValue>> source = 
-                                       new MutableObjectIterator<Tuple2<Long, 
SmallOrMediumOrLargeValue>>()
-                       {
-                               private final Random rnd = new Random();
-                               private int num = -1;
-                               
-                               @Override
-                               public Tuple2<Long, SmallOrMediumOrLargeValue> 
next(Tuple2<Long, SmallOrMediumOrLargeValue> reuse) {
-                                       if (++num < NUM_RECORDS) {
-                                               
-                                               int size;
-                                               if (num % LARGE_REC_INTERVAL == 
0) {
-                                                       size = 
SmallOrMediumOrLargeValue.LARGE_SIZE;
-                                               } else if (num % 
MEDIUM_REC_INTERVAL == 0) {
-                                                       size = 
SmallOrMediumOrLargeValue.MEDIUM_SIZE;
-                                               } else {
-                                                       size = 
SmallOrMediumOrLargeValue.SMALL_SIZE;
+
+                       MutableObjectIterator<Tuple2<Long, 
SmallOrMediumOrLargeValue>> source =
+                                       new MutableObjectIterator<Tuple2<Long, 
SmallOrMediumOrLargeValue>>() {
+                                               private final Random rnd = new 
Random();
+                                               private int num = -1;
+
+                                               @Override
+                                               public Tuple2<Long, 
SmallOrMediumOrLargeValue> next(Tuple2<Long, SmallOrMediumOrLargeValue> reuse) {
+                                                       return next();
+
                                                }
-                                               
-                                               long val = rnd.nextLong();
-                                               return new Tuple2<Long, 
SmallOrMediumOrLargeValue>(val, new SmallOrMediumOrLargeValue((int) val, size));
-                                       }
-                                       else {
-                                               return null;
-                                       }
-                                       
-                               }
 
                                                @Override
-                                               public Tuple2<Long, 
SmallOrMediumOrLargeValue> next() throws IOException {
-                                                       return new Tuple2<Long, 
SmallOrMediumOrLargeValue>();
+                                               public Tuple2<Long, 
SmallOrMediumOrLargeValue> next() {
+                                                       if (++num < 
NUM_RECORDS) {
+
+                                                               int size;
+                                                               if (num % 
LARGE_REC_INTERVAL == 0) {
+                                                                       size = 
SmallOrMediumOrLargeValue.LARGE_SIZE;
+                                                               }
+                                                               else if (num % 
MEDIUM_REC_INTERVAL == 0) {
+                                                                       size = 
SmallOrMediumOrLargeValue.MEDIUM_SIZE;
+                                                               }
+                                                               else {
+                                                                       size = 
SmallOrMediumOrLargeValue.SMALL_SIZE;
+                                                               }
+
+                                                               long val = 
rnd.nextLong();
+                                                               return new 
Tuple2<Long, SmallOrMediumOrLargeValue>(val, new 
SmallOrMediumOrLargeValue((int) val, size));
+                                                       }
+                                                       else {
+                                                               return null;
+                                                       }
                                                }
                                        };
                        
@@ -323,28 +320,27 @@ public class ExternalSortLargeRecordsITCase {
                        
                        final TypeSerializer<Tuple2<Long, 
SmallOrMediumOrLargeValue>> serializer = typeInfo.createSerializer();
                        final TypeComparator<Tuple2<Long, 
SmallOrMediumOrLargeValue>> comparator = typeInfo.createComparator(new int[] 
{0}, new boolean[]{false}, 0);
-                       
-                       MutableObjectIterator<Tuple2<Long, 
SmallOrMediumOrLargeValue>> source = 
-                                       new MutableObjectIterator<Tuple2<Long, 
SmallOrMediumOrLargeValue>>()
-                       {
-                               private final Random rnd = new Random();
-                               private int num = -1;
-                               
-                               @Override
-                               public Tuple2<Long, SmallOrMediumOrLargeValue> 
next(Tuple2<Long, SmallOrMediumOrLargeValue> reuse) {
-                                       if (++num < NUM_RECORDS) {
-                                               long val = rnd.nextLong();
-                                               return new Tuple2<Long, 
SmallOrMediumOrLargeValue>(val, new SmallOrMediumOrLargeValue((int) val, 
SmallOrMediumOrLargeValue.MEDIUM_SIZE));
-                                       }
-                                       else {
-                                               return null;
-                                       }
-                                       
-                               }
+
+                       MutableObjectIterator<Tuple2<Long, 
SmallOrMediumOrLargeValue>> source =
+                                       new MutableObjectIterator<Tuple2<Long, 
SmallOrMediumOrLargeValue>>() {
+                                               private final Random rnd = new 
Random();
+                                               private int num = -1;
+
+                                               @Override
+                                               public Tuple2<Long, 
SmallOrMediumOrLargeValue> next(Tuple2<Long, SmallOrMediumOrLargeValue> reuse) {
+                                                       return next();
+
+                                               }
 
                                                @Override
-                                               public Tuple2<Long, 
SmallOrMediumOrLargeValue> next() throws IOException {
-                                                       return new Tuple2<Long, 
SmallOrMediumOrLargeValue>();
+                                               public Tuple2<Long, 
SmallOrMediumOrLargeValue> next() {
+                                                       if (++num < 
NUM_RECORDS) {
+                                                               long val = 
rnd.nextLong();
+                                                               return new 
Tuple2<Long, SmallOrMediumOrLargeValue>(val, new 
SmallOrMediumOrLargeValue((int) val, SmallOrMediumOrLargeValue.MEDIUM_SIZE));
+                                                       }
+                                                       else {
+                                                               return null;
+                                                       }
                                                }
                                        };
                        

Reply via email to