[FLINK-9435][java] optimise ComparableKeySelector and ArrayKeySelector for more 
efficient Tuple creation

Benchmark results (2 runs) by running the benchmarks from
https://github.com/dataArtisans/flink-benchmarks/pull/5:

Benchmark                    Mode  Cnt     Score    Error   Units
================= old =================
KeyByBenchmarks.arrayKeyBy  thrpt   30  1151.305 ± 21.096  ops/ms
KeyByBenchmarks.arrayKeyBy  thrpt   30  1117.486 ± 43.508  ops/ms
KeyByBenchmarks.tupleKeyBy  thrpt   30  1659.634 ± 28.627  ops/ms
KeyByBenchmarks.tupleKeyBy  thrpt   30  1554.265 ± 82.604  ops/ms
================= new =================
KeyByBenchmarks.arrayKeyBy  thrpt   30  1150.552 ± 51.185  ops/ms
KeyByBenchmarks.arrayKeyBy  thrpt   30  1195.777 ± 10.621  ops/ms
KeyByBenchmarks.tupleKeyBy  thrpt   30  1743.633 ± 27.109  ops/ms
KeyByBenchmarks.tupleKeyBy  thrpt   30  1697.885 ± 22.101  ops/ms

This closes #6115.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/95eadfe1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/95eadfe1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/95eadfe1

Branch: refs/heads/master
Commit: 95eadfe15203ee0ab1459a9ade943234d9d6e7ce
Parents: 5857f55
Author: Nico Kruber <[email protected]>
Authored: Fri May 25 00:09:37 2018 +0200
Committer: Nico Kruber <[email protected]>
Committed: Thu Jul 19 17:07:09 2018 +0200

----------------------------------------------------------------------
 .../org/apache/flink/api/java/tuple/Tuple.java  | 32 ++++++++++++++++++++
 .../flink/api/java/tuple/TupleGenerator.java    | 11 +++++++
 .../streaming/util/keys/KeySelectorUtil.java    | 10 +++---
 3 files changed, 47 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/95eadfe1/flink-core/src/main/java/org/apache/flink/api/java/tuple/Tuple.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/tuple/Tuple.java 
b/flink-core/src/main/java/org/apache/flink/api/java/tuple/Tuple.java
index c282c59..7ce38f8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/tuple/Tuple.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/tuple/Tuple.java
@@ -113,6 +113,38 @@ public abstract class Tuple implements 
java.io.Serializable {
 
        // BEGIN_OF_TUPLE_DEPENDENT_CODE
        // GENERATED FROM org.apache.flink.api.java.tuple.TupleGenerator.
+       public static Tuple newInstance(int arity) {
+               switch (arity) {
+                       case 0: return Tuple0.INSTANCE;
+                       case 1: return new Tuple1();
+                       case 2: return new Tuple2();
+                       case 3: return new Tuple3();
+                       case 4: return new Tuple4();
+                       case 5: return new Tuple5();
+                       case 6: return new Tuple6();
+                       case 7: return new Tuple7();
+                       case 8: return new Tuple8();
+                       case 9: return new Tuple9();
+                       case 10: return new Tuple10();
+                       case 11: return new Tuple11();
+                       case 12: return new Tuple12();
+                       case 13: return new Tuple13();
+                       case 14: return new Tuple14();
+                       case 15: return new Tuple15();
+                       case 16: return new Tuple16();
+                       case 17: return new Tuple17();
+                       case 18: return new Tuple18();
+                       case 19: return new Tuple19();
+                       case 20: return new Tuple20();
+                       case 21: return new Tuple21();
+                       case 22: return new Tuple22();
+                       case 23: return new Tuple23();
+                       case 24: return new Tuple24();
+                       case 25: return new Tuple25();
+                       default: throw new IllegalArgumentException("The tuple 
arity must be in [0, " + MAX_ARITY + "].");
+               }
+       }
+
        private static final Class<?>[] CLASSES = new Class<?>[] {
                Tuple0.class, Tuple1.class, Tuple2.class, Tuple3.class, 
Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class, 
Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class, 
Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class, 
Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class, 
Tuple24.class, Tuple25.class
        };

http://git-wip-us.apache.org/repos/asf/flink/blob/95eadfe1/flink-core/src/test/java/org/apache/flink/api/java/tuple/TupleGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/tuple/TupleGenerator.java 
b/flink-core/src/test/java/org/apache/flink/api/java/tuple/TupleGenerator.java
index bd5598a..d684967 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/java/tuple/TupleGenerator.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/tuple/TupleGenerator.java
@@ -133,6 +133,17 @@ class TupleGenerator {
        private static void modifyTupleType(File root) throws IOException {
                // generate code
                StringBuilder sb = new StringBuilder();
+               sb.append("\tpublic static Tuple newInstance(int arity) {\n");
+               sb.append("\t\tswitch (arity) {\n");
+               // special case for Tuple0:
+               sb.append("\t\t\tcase 0: return Tuple0.INSTANCE;\n");
+               for (int i = FIRST; i <= LAST; i++) {
+                       sb.append("\t\t\tcase ").append(i).append(": return new 
Tuple").append(i).append("();\n");
+               }
+               sb.append("\t\t\tdefault: throw new 
IllegalArgumentException(\"The tuple arity must be in [0, \" + MAX_ARITY + 
\"].\");\n");
+               sb.append("\t\t}\n");
+               sb.append("\t}\n\n");
+
                sb.append("\tprivate static final Class<?>[] CLASSES = new 
Class<?>[] {\n\t\tTuple0.class");
                for (int i = FIRST; i <= LAST; i++) {
                        sb.append(", Tuple").append(i).append(".class");

http://git-wip-us.apache.org/repos/asf/flink/blob/95eadfe1/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
index 27ce573..ab608ef 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
@@ -180,8 +180,8 @@ public final class KeySelectorUtil {
                }
 
                @Override
-               public Tuple getKey(IN value) throws Exception {
-                       Tuple key = 
Tuple.getTupleClass(keyLength).newInstance();
+               public Tuple getKey(IN value) {
+                       Tuple key = Tuple.newInstance(keyLength);
                        comparator.extractKeys(value, keyArray, 0);
                        for (int i = 0; i < keyLength; i++) {
                                key.setField(keyArray[i], i);
@@ -210,18 +210,16 @@ public final class KeySelectorUtil {
                private static final long serialVersionUID = 1L;
 
                private final int[] fields;
-               private final Class<? extends Tuple> tupleClass;
                private transient TupleTypeInfo<Tuple> returnType;
 
                ArrayKeySelector(int[] fields, TupleTypeInfo<Tuple> returnType) 
{
                        this.fields = requireNonNull(fields);
                        this.returnType = requireNonNull(returnType);
-                       this.tupleClass = Tuple.getTupleClass(fields.length);
                }
 
                @Override
-               public Tuple getKey(IN value) throws Exception {
-                       Tuple key = tupleClass.newInstance();
+               public Tuple getKey(IN value) {
+                       Tuple key = Tuple.newInstance(fields.length);
                        for (int i = 0; i < fields.length; i++) {
                                key.setField(Array.get(value, fields[i]), i);
                        }

Reply via email to