[FLINK-9435][java] optimise ComparableKeySelector and ArrayKeySelector for more efficient Tuple creation
Benchmark results (2 runs) by running the benchmarks from https://github.com/dataArtisans/flink-benchmarks/pull/5: Benchmark Mode Cnt Score Error Units ================= old ================= KeyByBenchmarks.arrayKeyBy thrpt 30 1151.305 ± 21.096 ops/ms KeyByBenchmarks.arrayKeyBy thrpt 30 1117.486 ± 43.508 ops/ms KeyByBenchmarks.tupleKeyBy thrpt 30 1659.634 ± 28.627 ops/ms KeyByBenchmarks.tupleKeyBy thrpt 30 1554.265 ± 82.604 ops/ms ================= new ================= KeyByBenchmarks.arrayKeyBy thrpt 30 1150.552 ± 51.185 ops/ms KeyByBenchmarks.arrayKeyBy thrpt 30 1195.777 ± 10.621 ops/ms KeyByBenchmarks.tupleKeyBy thrpt 30 1743.633 ± 27.109 ops/ms KeyByBenchmarks.tupleKeyBy thrpt 30 1697.885 ± 22.101 ops/ms This closes #6115. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/402745eb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/402745eb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/402745eb Branch: refs/heads/release-1.6 Commit: 402745ebad3eaf01622ea85524f7ff029fa8df8b Parents: 0fec75c Author: Nico Kruber <[email protected]> Authored: Fri May 25 00:09:37 2018 +0200 Committer: Nico Kruber <[email protected]> Committed: Thu Jul 19 21:17:38 2018 +0200 ---------------------------------------------------------------------- .../org/apache/flink/api/java/tuple/Tuple.java | 32 ++++++++++++++++++++ .../flink/api/java/tuple/TupleGenerator.java | 11 +++++++ .../streaming/util/keys/KeySelectorUtil.java | 10 +++--- 3 files changed, 47 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/402745eb/flink-core/src/main/java/org/apache/flink/api/java/tuple/Tuple.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/tuple/Tuple.java b/flink-core/src/main/java/org/apache/flink/api/java/tuple/Tuple.java index c282c59..7ce38f8 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/tuple/Tuple.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/tuple/Tuple.java @@ -113,6 +113,38 @@ public abstract class Tuple implements java.io.Serializable { // BEGIN_OF_TUPLE_DEPENDENT_CODE // GENERATED FROM org.apache.flink.api.java.tuple.TupleGenerator. + public static Tuple newInstance(int arity) { + switch (arity) { + case 0: return Tuple0.INSTANCE; + case 1: return new Tuple1(); + case 2: return new Tuple2(); + case 3: return new Tuple3(); + case 4: return new Tuple4(); + case 5: return new Tuple5(); + case 6: return new Tuple6(); + case 7: return new Tuple7(); + case 8: return new Tuple8(); + case 9: return new Tuple9(); + case 10: return new Tuple10(); + case 11: return new Tuple11(); + case 12: return new Tuple12(); + case 13: return new Tuple13(); + case 14: return new Tuple14(); + case 15: return new Tuple15(); + case 16: return new Tuple16(); + case 17: return new Tuple17(); + case 18: return new Tuple18(); + case 19: return new Tuple19(); + case 20: return new Tuple20(); + case 21: return new Tuple21(); + case 22: return new Tuple22(); + case 23: return new Tuple23(); + case 24: return new Tuple24(); + case 25: return new Tuple25(); + default: throw new IllegalArgumentException("The tuple arity must be in [0, " + MAX_ARITY + "]."); + } + } + private static final Class<?>[] CLASSES = new Class<?>[] { Tuple0.class, Tuple1.class, Tuple2.class, Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class, Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class, Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class, Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class, Tuple24.class, Tuple25.class }; http://git-wip-us.apache.org/repos/asf/flink/blob/402745eb/flink-core/src/test/java/org/apache/flink/api/java/tuple/TupleGenerator.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/tuple/TupleGenerator.java b/flink-core/src/test/java/org/apache/flink/api/java/tuple/TupleGenerator.java index bd5598a..d684967 100644 --- a/flink-core/src/test/java/org/apache/flink/api/java/tuple/TupleGenerator.java +++ b/flink-core/src/test/java/org/apache/flink/api/java/tuple/TupleGenerator.java @@ -133,6 +133,17 @@ class TupleGenerator { private static void modifyTupleType(File root) throws IOException { // generate code StringBuilder sb = new StringBuilder(); + sb.append("\tpublic static Tuple newInstance(int arity) {\n"); + sb.append("\t\tswitch (arity) {\n"); + // special case for Tuple0: + sb.append("\t\t\tcase 0: return Tuple0.INSTANCE;\n"); + for (int i = FIRST; i <= LAST; i++) { + sb.append("\t\t\tcase ").append(i).append(": return new Tuple").append(i).append("();\n"); + } + sb.append("\t\t\tdefault: throw new IllegalArgumentException(\"The tuple arity must be in [0, \" + MAX_ARITY + \"].\");\n"); + sb.append("\t\t}\n"); + sb.append("\t}\n\n"); + sb.append("\tprivate static final Class<?>[] CLASSES = new Class<?>[] {\n\t\tTuple0.class"); for (int i = FIRST; i <= LAST; i++) { sb.append(", Tuple").append(i).append(".class"); http://git-wip-us.apache.org/repos/asf/flink/blob/402745eb/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java index 27ce573..ab608ef 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java @@ -180,8 +180,8 @@ public final class KeySelectorUtil { } @Override - public Tuple getKey(IN value) throws Exception { - Tuple key = Tuple.getTupleClass(keyLength).newInstance(); + public Tuple getKey(IN value) { + Tuple key = Tuple.newInstance(keyLength); comparator.extractKeys(value, keyArray, 0); for (int i = 0; i < keyLength; i++) { key.setField(keyArray[i], i); @@ -210,18 +210,16 @@ public final class KeySelectorUtil { private static final long serialVersionUID = 1L; private final int[] fields; - private final Class<? extends Tuple> tupleClass; private transient TupleTypeInfo<Tuple> returnType; ArrayKeySelector(int[] fields, TupleTypeInfo<Tuple> returnType) { this.fields = requireNonNull(fields); this.returnType = requireNonNull(returnType); - this.tupleClass = Tuple.getTupleClass(fields.length); } @Override - public Tuple getKey(IN value) throws Exception { - Tuple key = tupleClass.newInstance(); + public Tuple getKey(IN value) { + Tuple key = Tuple.newInstance(fields.length); for (int i = 0; i < fields.length; i++) { key.setField(Array.get(value, fields[i]), i); }
