Repository: flink Updated Branches: refs/heads/master 945fc023a -> d7aa989ee
[FLINK-3321] TupleSerializer.getLength() can return fixed-length size This closes #1654. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d7aa989e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d7aa989e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d7aa989e Branch: refs/heads/master Commit: d7aa989ee1c72d7830c612554b89673bf38cf447 Parents: 945fc02 Author: zentol <s.mo...@web.de> Authored: Tue Feb 16 12:23:41 2016 +0100 Committer: zentol <s.mo...@web.de> Committed: Tue Mar 8 12:02:21 2016 +0100 ---------------------------------------------------------------------- .../typeutils/runtime/TupleSerializerBase.java | 16 +++++++++++++++- .../typeutils/runtime/TupleSerializerTest.java | 19 +++++++++---------- .../api/scala/runtime/TupleSerializerTest.scala | 18 +++++++++--------- 3 files changed, 33 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d7aa989e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java index fc657a1..8b1d8ca 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java @@ -38,6 +38,8 @@ public abstract class TupleSerializerBase<T> extends TypeSerializer<T> { protected final int arity; + private int length = -2; + @SuppressWarnings("unchecked") public TupleSerializerBase(Class<T> tupleClass, TypeSerializer<?>[] fieldSerializers) { this.tupleClass = Preconditions.checkNotNull(tupleClass); @@ -56,7 +58,19 @@ public abstract class TupleSerializerBase<T> extends TypeSerializer<T> { @Override public int getLength() { - return -1; + if (length == -2) { + int sum = 0; + for (TypeSerializer<Object> serializer : fieldSerializers) { + if (serializer.getLength() > 0) { + sum += serializer.getLength(); + } else { + length = -1; + return length; + } + } + length = sum; + } + return length; } public int getArity() { http://git-wip-us.apache.org/repos/asf/flink/blob/d7aa989e/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java index 017eb44..13f91b0 100644 --- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java @@ -46,7 +46,7 @@ public class TupleSerializerTest { public void testTuple0() { Tuple0[] testTuples = new Tuple0[] { Tuple0.INSTANCE, Tuple0.INSTANCE, Tuple0.INSTANCE }; - runTests(testTuples); + runTests(1, testTuples); } @Test @@ -57,7 +57,7 @@ public class TupleSerializerTest { new Tuple1<Integer>(Integer.MAX_VALUE), new Tuple1<Integer>(Integer.MIN_VALUE) }; - runTests(testTuples); + runTests(4, testTuples); } @Test @@ -74,7 +74,7 @@ public class TupleSerializerTest { new Tuple1<String>("") }; - runTests(testTuples); + runTests(-1, testTuples); } @Test @@ -101,7 +101,7 @@ public class TupleSerializerTest { new Tuple1<String[]>(arr2) }; - runTests(testTuples); + runTests(-1, testTuples); } @Test @@ -118,7 +118,7 @@ public class TupleSerializerTest { new Tuple2<String, Double>(StringUtils.getRandomString(rnd, 10, 100), rnd.nextDouble()) }; - runTests(testTuples); + runTests(-1, testTuples); } @Test @@ -148,7 +148,7 @@ public class TupleSerializerTest { new Tuple2<String, String[]>(StringUtils.getRandomString(rnd, 30, 170), arr2) }; - runTests(testTuples); + runTests(-1, testTuples); } @@ -212,17 +212,16 @@ public class TupleSerializerTest { new Tuple5<SimpleTypes, Book, ComplexNestedObject1, BookAuthor, ComplexNestedObject2>(g, b6, o4, ba1, co2) }; - runTests(testTuples); + runTests(-1, testTuples); } - private <T extends Tuple> void runTests(T... instances) { + private <T extends Tuple> void runTests(int length, T... instances) { try { TupleTypeInfo<T> tupleTypeInfo = (TupleTypeInfo<T>) TypeExtractor.getForObject(instances[0]); TypeSerializer<T> serializer = tupleTypeInfo.createSerializer(new ExecutionConfig()); Class<T> tupleClass = tupleTypeInfo.getTypeClass(); - - int length = -1; + if(tupleClass == Tuple0.class) { length = 1; } http://git-wip-us.apache.org/repos/asf/flink/blob/d7aa989e/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala index 368204b..b210c99 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala @@ -40,7 +40,7 @@ class TupleSerializerTest { def testTuple1Int(): Unit = { val testTuples = Array(Tuple1(42), Tuple1(1), Tuple1(0), Tuple1(-1), Tuple1(Int.MaxValue), Tuple1(Int.MinValue)) - runTests(testTuples) + runTests(testTuples, 4) } @Test @@ -53,7 +53,7 @@ class TupleSerializerTest { Tuple1(StringUtils.getRandomString(rnd, 30, 170)), Tuple1(StringUtils.getRandomString(rnd, 15, 50)), Tuple1("")) - runTests(testTuples) + runTests(testTuples, -1) } @Test @@ -77,7 +77,7 @@ class TupleSerializerTest { StringUtils.getRandomString(rnd, 100 * 1024, 105 * 1024), "bar") val testTuples = Array(Tuple1(arr1), Tuple1(arr2)) - runTests(testTuples) + runTests(testTuples, -1) } @Test @@ -91,7 +91,7 @@ class TupleSerializerTest { ("", rnd.nextDouble), (StringUtils.getRandomString(rnd, 10, 100), rnd.nextDouble), (StringUtils.getRandomString(rnd, 10, 100), rnd.nextDouble)) - runTests(testTuples) + runTests(testTuples, -1) } @Test @@ -106,7 +106,7 @@ class TupleSerializerTest { (StringUtils.getRandomString(rnd, 10, 100), new LocalDate(rnd.nextInt)), (StringUtils.getRandomString(rnd, 10, 100), new LocalDate(rnd.nextInt))) - runTests(testTuples) + runTests(testTuples, -1) } @Test @@ -134,7 +134,7 @@ class TupleSerializerTest { (StringUtils.getRandomString(rnd, 30, 170), arr1), (StringUtils.getRandomString(rnd, 30, 170), arr2), (StringUtils.getRandomString(rnd, 30, 170), arr2)) - runTests(testTuples) + runTests(testTuples, -1) } @Test @@ -189,10 +189,10 @@ class TupleSerializerTest { (e, b4, o5, ba2, co4), (f, b5, o1, ba2, co4), (g, b6, o4, ba1, co2)) - runTests(testTuples) + runTests(testTuples, -1) } - private final def runTests[T <: Product : TypeInformation](instances: Array[T]) { + private final def runTests[T <: Product : TypeInformation](instances: Array[T], length: Int) { try { // Register the custom Kryo Serializer val conf = new ExecutionConfig() @@ -201,7 +201,7 @@ class TupleSerializerTest { val tupleTypeInfo = implicitly[TypeInformation[T]].asInstanceOf[TupleTypeInfoBase[T]] val serializer = tupleTypeInfo.createSerializer(conf) val tupleClass = tupleTypeInfo.getTypeClass - val test = new TupleSerializerTestInstance[T](serializer, tupleClass, -1, instances) + val test = new TupleSerializerTestInstance[T](serializer, tupleClass, length, instances) test.testAll() } catch { case e: Exception => {