Repository: flink Updated Branches: refs/heads/master 2f16ca2db -> 7bc78cbf9
http://git-wip-us.apache.org/repos/asf/flink/blob/7bc78cbf/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java index a38633c..388e8bd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java @@ -35,8 +35,8 @@ public class StringPairSerializer extends TypeSerializer<StringPair> { } @Override - public boolean isStateful() { - return false; + public StringPairSerializer duplicate() { + return this; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/7bc78cbf/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala index 6169af3..2a76c37 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala @@ -17,6 +17,7 @@ */ package org.apache.flink.api.scala.typeutils +import org.apache.commons.lang.SerializationUtils import org.apache.flink.api.common.typeutils.TypeSerializer import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase import org.apache.flink.core.memory.{DataOutputView, DataInputView} @@ -28,12 +29,23 @@ import org.apache.flink.core.memory.{DataOutputView, DataInputView} abstract class CaseClassSerializer[T <: Product]( clazz: Class[T], scalaFieldSerializers: Array[TypeSerializer[_]]) - extends TupleSerializerBase[T](clazz, scalaFieldSerializers) { + extends TupleSerializerBase[T](clazz, scalaFieldSerializers) with Cloneable { @transient var fields : Array[AnyRef] = _ @transient var instanceCreationFailed : Boolean = false + override def duplicate = { + val result = this.clone().asInstanceOf[CaseClassSerializer[T]] + + // set transient fields to null and make copy of serializers + result.fields = null + result.instanceCreationFailed = false + result.fieldSerializers = fieldSerializers.map(_.duplicate()) + + result + } + def createInstance: T = { if (instanceCreationFailed) { null.asInstanceOf[T] @@ -56,8 +68,6 @@ abstract class CaseClassSerializer[T <: Product]( } } - override def isStateful() = true - def copy(from: T, reuse: T): T = { copy(from) } http://git-wip-us.apache.org/repos/asf/flink/blob/7bc78cbf/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala index cb72486..65628a0 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala @@ -28,7 +28,7 @@ class EitherSerializer[A, B, T <: Either[A, B]]( val rightSerializer: TypeSerializer[B]) extends TypeSerializer[T] { - override def isStateful: Boolean = false + override def duplicate: EitherSerializer[A,B,T] = this override def createInstance: T = { Left(null).asInstanceOf[T] http://git-wip-us.apache.org/repos/asf/flink/blob/7bc78cbf/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala index 8685cc5..147a060 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala @@ -27,7 +27,7 @@ import org.apache.flink.core.memory.{DataOutputView, DataInputView} */ class NothingSerializer extends TypeSerializer[Any] { - override def isStateful: Boolean = false + override def duplicate: NothingSerializer = this override def createInstance: Any = { Integer.valueOf(-1) http://git-wip-us.apache.org/repos/asf/flink/blob/7bc78cbf/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala index 4f8f632..488710d 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala @@ -26,7 +26,7 @@ import org.apache.flink.core.memory.{DataOutputView, DataInputView} class OptionSerializer[A](val elemSerializer: TypeSerializer[A]) extends TypeSerializer[Option[A]] { - override def isStateful: Boolean = false + override def duplicate: OptionSerializer[A] = this override def createInstance: Option[A] = { None http://git-wip-us.apache.org/repos/asf/flink/blob/7bc78cbf/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala index fa519d9..38fd14b 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala @@ -28,13 +28,25 @@ import scala.collection.generic.CanBuildFrom * Serializer for Scala Collections. */ abstract class TraversableSerializer[T <: TraversableOnce[E], E]( - val elementSerializer: TypeSerializer[E]) - extends TypeSerializer[T] { + var elementSerializer: TypeSerializer[E]) + extends TypeSerializer[T] with Cloneable { def getCbf: CanBuildFrom[T, E, T] @transient var cbf: CanBuildFrom[T, E, T] = getCbf + override def duplicate = { + val duplicateElementSerializer = elementSerializer.duplicate() + if (duplicateElementSerializer == elementSerializer) { + // is not stateful, so return ourselves + this + } else { + val result = this.clone().asInstanceOf[TraversableSerializer[T, E]] + result.elementSerializer = elementSerializer.duplicate() + result + } + } + private def readObject(in: ObjectInputStream): Unit = { in.defaultReadObject() cbf = getCbf @@ -85,8 +97,6 @@ abstract class TraversableSerializer[T <: TraversableOnce[E], E]( } } - override def isStateful: Boolean = false - override def deserialize(source: DataInputView): T = { val len = source.readInt() val builder = cbf() http://git-wip-us.apache.org/repos/asf/flink/blob/7bc78cbf/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala index e94c944..1f565f2 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala @@ -29,7 +29,7 @@ import scala.util.{Success, Try, Failure} class TrySerializer[A](val elemSerializer: TypeSerializer[A]) extends TypeSerializer[Try[A]] { - override def isStateful: Boolean = false + override def duplicate: TrySerializer[A] = this val throwableSerializer = new KryoSerializer[Throwable](classOf[Throwable]) http://git-wip-us.apache.org/repos/asf/flink/blob/7bc78cbf/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java index 5dfe4b2..a5ef3a7 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java @@ -24,8 +24,7 @@ import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerFactory; -import org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory; -import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory; +import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplit; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; @@ -49,12 +48,7 @@ public class FileSourceFunction extends RichSourceFunction<String> { private static TypeSerializerFactory<String> createSerializer(TypeInformation<String> typeInfo) { TypeSerializer<String> serializer = typeInfo.createSerializer(); - if (serializer.isStateful()) { - return new RuntimeStatefulSerializerFactory<String>(serializer, typeInfo.getTypeClass()); - } else { - return new RuntimeStatelessSerializerFactory<String>(serializer, - typeInfo.getTypeClass()); - } + return new RuntimeSerializerFactory<String>(serializer, typeInfo.getTypeClass()); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/7bc78cbf/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java index 85faa9e..98f12ec 100755 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java @@ -47,8 +47,8 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord } @Override - public boolean isStateful() { - return false; + public StreamRecordSerializer<T> duplicate() { + return this; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/7bc78cbf/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java index 822b4f2..751ced3 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java @@ -35,11 +35,6 @@ public final class VertexWithAdjacencyListSerializer extends TypeSerializerSingl } @Override - public boolean isStateful() { - return false; - } - - @Override public VertexWithAdjacencyList createInstance() { return new VertexWithAdjacencyList(); } http://git-wip-us.apache.org/repos/asf/flink/blob/7bc78cbf/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java index e972cd1..8ff0233 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java @@ -35,11 +35,6 @@ public final class VertexWithRankAndDanglingSerializer extends TypeSerializerSin } @Override - public boolean isStateful() { - return false; - } - - @Override public VertexWithRankAndDangling createInstance() { return new VertexWithRankAndDangling(); } http://git-wip-us.apache.org/repos/asf/flink/blob/7bc78cbf/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java index 928d4f4..2c3abcd 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java @@ -35,11 +35,6 @@ public final class VertexWithRankSerializer extends TypeSerializerSingleton<Vert } @Override - public boolean isStateful() { - return false; - } - - @Override public VertexWithRank createInstance() { return new VertexWithRank(); } http://git-wip-us.apache.org/repos/asf/flink/blob/7bc78cbf/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala index 6e5296b..4f8816f 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala @@ -31,7 +31,7 @@ import org.apache.flink.runtime.memorymanager.DefaultMemoryManager import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.runtime.operators.sort.UnilateralSortMerger -import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory +import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory import org.junit.Assert._ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable @@ -89,7 +89,7 @@ class MassiveCaseClassSortingITCase { sorter = new UnilateralSortMerger[StringTuple](mm, ioMan, inputIterator, new DummyInvokable(), - new RuntimeStatelessSerializerFactory[StringTuple](serializer, classOf[StringTuple]), + new RuntimeSerializerFactory[StringTuple](serializer, classOf[StringTuple]), comparator, 1.0, 4, 0.8f) val sortedData = sorter.getIterator
