Repository: flink
Updated Branches:
  refs/heads/master 2f16ca2db -> 7bc78cbf9


http://git-wip-us.apache.org/repos/asf/flink/blob/7bc78cbf/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
index a38633c..388e8bd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
@@ -35,8 +35,8 @@ public class StringPairSerializer extends 
TypeSerializer<StringPair> {
        }
 
        @Override
-       public boolean isStateful() {
-               return false;
+       public StringPairSerializer duplicate() {
+               return this;
        }
        
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7bc78cbf/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
index 6169af3..2a76c37 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.api.scala.typeutils
 
+import org.apache.commons.lang.SerializationUtils
 import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase
 import org.apache.flink.core.memory.{DataOutputView, DataInputView}
@@ -28,12 +29,23 @@ import org.apache.flink.core.memory.{DataOutputView, 
DataInputView}
 abstract class CaseClassSerializer[T <: Product](
     clazz: Class[T],
     scalaFieldSerializers: Array[TypeSerializer[_]])
-  extends TupleSerializerBase[T](clazz, scalaFieldSerializers) {
+  extends TupleSerializerBase[T](clazz, scalaFieldSerializers) with Cloneable {
 
   @transient var fields : Array[AnyRef] = _
 
   @transient var instanceCreationFailed : Boolean = false
 
+  override def duplicate = {
+    val result = this.clone().asInstanceOf[CaseClassSerializer[T]]
+
+    // set transient fields to null and make copy of serializers
+    result.fields = null
+    result.instanceCreationFailed = false
+    result.fieldSerializers = fieldSerializers.map(_.duplicate())
+
+    result
+  }
+
   def createInstance: T = {
     if (instanceCreationFailed) {
       null.asInstanceOf[T]
@@ -56,8 +68,6 @@ abstract class CaseClassSerializer[T <: Product](
     }
   }
 
-  override def isStateful() = true
-
   def copy(from: T, reuse: T): T = {
     copy(from)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/7bc78cbf/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
index cb72486..65628a0 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
@@ -28,7 +28,7 @@ class EitherSerializer[A, B, T <: Either[A, B]](
     val rightSerializer: TypeSerializer[B])
   extends TypeSerializer[T] {
 
-  override def isStateful: Boolean = false
+  override def duplicate: EitherSerializer[A,B,T] = this
 
   override def createInstance: T = {
     Left(null).asInstanceOf[T]

http://git-wip-us.apache.org/repos/asf/flink/blob/7bc78cbf/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
index 8685cc5..147a060 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
@@ -27,7 +27,7 @@ import org.apache.flink.core.memory.{DataOutputView, 
DataInputView}
  */
 class NothingSerializer extends TypeSerializer[Any] {
 
-  override def isStateful: Boolean = false
+  override def duplicate: NothingSerializer = this
 
   override def createInstance: Any = {
     Integer.valueOf(-1)

http://git-wip-us.apache.org/repos/asf/flink/blob/7bc78cbf/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
index 4f8f632..488710d 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
@@ -26,7 +26,7 @@ import org.apache.flink.core.memory.{DataOutputView, 
DataInputView}
 class OptionSerializer[A](val elemSerializer: TypeSerializer[A])
   extends TypeSerializer[Option[A]] {
 
-  override def isStateful: Boolean = false
+  override def duplicate: OptionSerializer[A] = this
 
   override def createInstance: Option[A] = {
     None

http://git-wip-us.apache.org/repos/asf/flink/blob/7bc78cbf/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
index fa519d9..38fd14b 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
@@ -28,13 +28,25 @@ import scala.collection.generic.CanBuildFrom
  * Serializer for Scala Collections.
  */
 abstract class TraversableSerializer[T <: TraversableOnce[E], E](
-    val elementSerializer: TypeSerializer[E])
-  extends TypeSerializer[T] {
+    var elementSerializer: TypeSerializer[E])
+  extends TypeSerializer[T] with Cloneable {
 
   def getCbf: CanBuildFrom[T, E, T]
 
   @transient var cbf: CanBuildFrom[T, E, T] = getCbf
 
+  override def duplicate = {
+    val duplicateElementSerializer = elementSerializer.duplicate()
+    if (duplicateElementSerializer == elementSerializer) {
+      // is not stateful, so return ourselves
+      this
+    } else {
+      val result = this.clone().asInstanceOf[TraversableSerializer[T, E]]
+      result.elementSerializer = elementSerializer.duplicate()
+      result
+    }
+  }
+
   private def readObject(in: ObjectInputStream): Unit = {
     in.defaultReadObject()
     cbf = getCbf
@@ -85,8 +97,6 @@ abstract class TraversableSerializer[T <: TraversableOnce[E], 
E](
     }
   }
 
-  override def isStateful: Boolean = false
-
   override def deserialize(source: DataInputView): T = {
     val len = source.readInt()
     val builder = cbf()

http://git-wip-us.apache.org/repos/asf/flink/blob/7bc78cbf/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
index e94c944..1f565f2 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
@@ -29,7 +29,7 @@ import scala.util.{Success, Try, Failure}
 class TrySerializer[A](val elemSerializer: TypeSerializer[A])
   extends TypeSerializer[Try[A]] {
 
-  override def isStateful: Boolean = false
+  override def duplicate: TrySerializer[A] = this
 
   val throwableSerializer = new KryoSerializer[Throwable](classOf[Throwable])
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7bc78cbf/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
index 5dfe4b2..a5ef3a7 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
@@ -24,8 +24,7 @@ import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import 
org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory;
-import 
org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory;
+import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
@@ -49,12 +48,7 @@ public class FileSourceFunction extends 
RichSourceFunction<String> {
        private static TypeSerializerFactory<String> 
createSerializer(TypeInformation<String> typeInfo) {
                TypeSerializer<String> serializer = typeInfo.createSerializer();
 
-               if (serializer.isStateful()) {
-                       return new 
RuntimeStatefulSerializerFactory<String>(serializer, typeInfo.getTypeClass());
-               } else {
-                       return new 
RuntimeStatelessSerializerFactory<String>(serializer,
-                                       typeInfo.getTypeClass());
-               }
+               return new RuntimeSerializerFactory<String>(serializer, 
typeInfo.getTypeClass());
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7bc78cbf/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
index 85faa9e..98f12ec 100755
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
@@ -47,8 +47,8 @@ public final class StreamRecordSerializer<T> extends 
TypeSerializer<StreamRecord
        }
 
        @Override
-       public boolean isStateful() {
-               return false;
+       public StreamRecordSerializer<T> duplicate() {
+               return this;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7bc78cbf/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java
index 822b4f2..751ced3 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java
@@ -35,11 +35,6 @@ public final class VertexWithAdjacencyListSerializer extends 
TypeSerializerSingl
        }
 
        @Override
-       public boolean isStateful() {
-               return false;
-       }
-       
-       @Override
        public VertexWithAdjacencyList createInstance() {
                return new VertexWithAdjacencyList();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/7bc78cbf/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java
index e972cd1..8ff0233 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java
@@ -35,11 +35,6 @@ public final class VertexWithRankAndDanglingSerializer 
extends TypeSerializerSin
        }
 
        @Override
-       public boolean isStateful() {
-               return false;
-       }
-       
-       @Override
        public VertexWithRankAndDangling createInstance() {
                return new VertexWithRankAndDangling();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/7bc78cbf/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java
index 928d4f4..2c3abcd 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java
@@ -35,11 +35,6 @@ public final class VertexWithRankSerializer extends 
TypeSerializerSingleton<Vert
        }
 
        @Override
-       public boolean isStateful() {
-               return false;
-       }
-       
-       @Override
        public VertexWithRank createInstance() {
                return new VertexWithRank();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/7bc78cbf/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
index 6e5296b..4f8816f 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
@@ -31,7 +31,7 @@ import 
org.apache.flink.runtime.memorymanager.DefaultMemoryManager
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.runtime.operators.sort.UnilateralSortMerger
-import 
org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory
+import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory
 import org.junit.Assert._
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
 
@@ -89,7 +89,7 @@ class MassiveCaseClassSortingITCase {
         
         sorter = new UnilateralSortMerger[StringTuple](mm, ioMan, 
inputIterator,
               new DummyInvokable(), 
-              new RuntimeStatelessSerializerFactory[StringTuple](serializer, 
classOf[StringTuple]),
+              new RuntimeSerializerFactory[StringTuple](serializer, 
classOf[StringTuple]),
               comparator, 1.0, 4, 0.8f)
             
         val sortedData = sorter.getIterator

Reply via email to