Repository: flink
Updated Branches:
  refs/heads/master 945fc023a -> d7aa989ee


[FLINK-3321] TupleSerializer.getLength() can return fixed-length size

This closes #1654.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d7aa989e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d7aa989e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d7aa989e

Branch: refs/heads/master
Commit: d7aa989ee1c72d7830c612554b89673bf38cf447
Parents: 945fc02
Author: zentol <s.mo...@web.de>
Authored: Tue Feb 16 12:23:41 2016 +0100
Committer: zentol <s.mo...@web.de>
Committed: Tue Mar 8 12:02:21 2016 +0100

----------------------------------------------------------------------
 .../typeutils/runtime/TupleSerializerBase.java   | 16 +++++++++++++++-
 .../typeutils/runtime/TupleSerializerTest.java   | 19 +++++++++----------
 .../api/scala/runtime/TupleSerializerTest.scala  | 18 +++++++++---------
 3 files changed, 33 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d7aa989e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
index fc657a1..8b1d8ca 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
@@ -38,6 +38,8 @@ public abstract class TupleSerializerBase<T> extends 
TypeSerializer<T> {
 
        protected final int arity;
 
+       private int length = -2;
+
        @SuppressWarnings("unchecked")
        public TupleSerializerBase(Class<T> tupleClass, TypeSerializer<?>[] 
fieldSerializers) {
                this.tupleClass = Preconditions.checkNotNull(tupleClass);
@@ -56,7 +58,19 @@ public abstract class TupleSerializerBase<T> extends 
TypeSerializer<T> {
 
        @Override
        public int getLength() {
-               return -1;
+               if (length == -2) {
+                       int sum = 0;
+                       for (TypeSerializer<Object> serializer : 
fieldSerializers) {
+                               if (serializer.getLength() > 0) {
+                                       sum += serializer.getLength();
+                               } else {
+                                       length = -1;
+                                       return length;
+                               }
+                       }
+                       length = sum;
+               }
+               return length;
        }
 
        public int getArity() {

http://git-wip-us.apache.org/repos/asf/flink/blob/d7aa989e/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java
index 017eb44..13f91b0 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java
@@ -46,7 +46,7 @@ public class TupleSerializerTest {
        public void testTuple0() {
                Tuple0[] testTuples = new Tuple0[] { Tuple0.INSTANCE, 
Tuple0.INSTANCE, Tuple0.INSTANCE };
 
-               runTests(testTuples);
+               runTests(1, testTuples);
        }
 
        @Test
@@ -57,7 +57,7 @@ public class TupleSerializerTest {
                        new Tuple1<Integer>(Integer.MAX_VALUE), new 
Tuple1<Integer>(Integer.MIN_VALUE)
                };
                
-               runTests(testTuples);
+               runTests(4, testTuples);
        }
        
        @Test
@@ -74,7 +74,7 @@ public class TupleSerializerTest {
                        new Tuple1<String>("")
                };
                
-               runTests(testTuples);
+               runTests(-1, testTuples);
        }
        
        @Test
@@ -101,7 +101,7 @@ public class TupleSerializerTest {
                        new Tuple1<String[]>(arr2)
                };
                
-               runTests(testTuples);
+               runTests(-1, testTuples);
        }
        
        @Test
@@ -118,7 +118,7 @@ public class TupleSerializerTest {
                                new Tuple2<String, 
Double>(StringUtils.getRandomString(rnd, 10, 100), rnd.nextDouble())
                        };
                
-               runTests(testTuples);
+               runTests(-1, testTuples);
        }
        
        @Test
@@ -148,7 +148,7 @@ public class TupleSerializerTest {
                        new Tuple2<String, 
String[]>(StringUtils.getRandomString(rnd, 30, 170), arr2)
                };
                
-               runTests(testTuples);
+               runTests(-1, testTuples);
        }
        
 
@@ -212,17 +212,16 @@ public class TupleSerializerTest {
                                new Tuple5<SimpleTypes, Book, 
ComplexNestedObject1, BookAuthor, ComplexNestedObject2>(g, b6, o4, ba1, co2)
                };
                
-               runTests(testTuples);
+               runTests(-1, testTuples);
        }
 
-       private <T extends Tuple> void runTests(T... instances) {
+       private <T extends Tuple> void runTests(int length, T... instances) {
                try {
                        TupleTypeInfo<T> tupleTypeInfo = (TupleTypeInfo<T>) 
TypeExtractor.getForObject(instances[0]);
                        TypeSerializer<T> serializer = 
tupleTypeInfo.createSerializer(new ExecutionConfig());
                        
                        Class<T> tupleClass = tupleTypeInfo.getTypeClass();
-                       
-                       int length = -1;
+
                        if(tupleClass == Tuple0.class) {
                                length = 1;
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/d7aa989e/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
index 368204b..b210c99 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
@@ -40,7 +40,7 @@ class TupleSerializerTest {
   def testTuple1Int(): Unit = {
     val testTuples = Array(Tuple1(42), Tuple1(1), Tuple1(0), Tuple1(-1), 
Tuple1(Int.MaxValue),
       Tuple1(Int.MinValue))
-    runTests(testTuples)
+    runTests(testTuples, 4)
   }
 
   @Test
@@ -53,7 +53,7 @@ class TupleSerializerTest {
       Tuple1(StringUtils.getRandomString(rnd, 30, 170)),
       Tuple1(StringUtils.getRandomString(rnd, 15, 50)),
       Tuple1(""))
-    runTests(testTuples)
+    runTests(testTuples, -1)
   }
 
   @Test
@@ -77,7 +77,7 @@ class TupleSerializerTest {
       StringUtils.getRandomString(rnd, 100 * 1024, 105 * 1024),
       "bar")
     val testTuples = Array(Tuple1(arr1), Tuple1(arr2))
-    runTests(testTuples)
+    runTests(testTuples, -1)
   }
 
   @Test
@@ -91,7 +91,7 @@ class TupleSerializerTest {
       ("", rnd.nextDouble),
       (StringUtils.getRandomString(rnd, 10, 100), rnd.nextDouble),
       (StringUtils.getRandomString(rnd, 10, 100), rnd.nextDouble))
-    runTests(testTuples)
+    runTests(testTuples, -1)
   }
 
   @Test
@@ -106,7 +106,7 @@ class TupleSerializerTest {
       (StringUtils.getRandomString(rnd, 10, 100), new LocalDate(rnd.nextInt)),
       (StringUtils.getRandomString(rnd, 10, 100), new LocalDate(rnd.nextInt)))
       
-    runTests(testTuples)
+    runTests(testTuples, -1)
   }
 
   @Test
@@ -134,7 +134,7 @@ class TupleSerializerTest {
       (StringUtils.getRandomString(rnd, 30, 170), arr1),
       (StringUtils.getRandomString(rnd, 30, 170), arr2),
       (StringUtils.getRandomString(rnd, 30, 170), arr2))
-    runTests(testTuples)
+    runTests(testTuples, -1)
   }
 
   @Test
@@ -189,10 +189,10 @@ class TupleSerializerTest {
       (e, b4, o5, ba2, co4),
       (f, b5, o1, ba2, co4),
       (g, b6, o4, ba1, co2))
-    runTests(testTuples)
+    runTests(testTuples, -1)
   }
 
-  private final def runTests[T <: Product : TypeInformation](instances: 
Array[T]) {
+  private final def runTests[T <: Product : TypeInformation](instances: 
Array[T], length: Int) {
     try {
       // Register the custom Kryo Serializer
       val conf = new ExecutionConfig()
@@ -201,7 +201,7 @@ class TupleSerializerTest {
       val tupleTypeInfo = 
implicitly[TypeInformation[T]].asInstanceOf[TupleTypeInfoBase[T]]
       val serializer = tupleTypeInfo.createSerializer(conf)
       val tupleClass = tupleTypeInfo.getTypeClass
-      val test = new TupleSerializerTestInstance[T](serializer, tupleClass, 
-1, instances)
+      val test = new TupleSerializerTestInstance[T](serializer, tupleClass, 
length, instances)
       test.testAll()
     } catch {
       case e: Exception => {

Reply via email to