This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 19a32a1865ad257b8a14111f8f2787e865c8f34b Author: Tzu-Li (Gordon) Tai <[email protected]> AuthorDate: Mon Feb 25 15:36:47 2019 +0800 [FLINK-11740] [core] Clarify CompositeTypeSerializerSnapshot class signature Prior to this commit, the CompositeTypeSerializerSnapshot class signature was a bit confusing and contained raw types. Moreover, it required subclasses to always erase types and re-cast. This closes #7818. --- .../common/typeutils/CompositeTypeSerializerSnapshot.java | 7 ++++--- .../typeutils/base/GenericArraySerializerSnapshot.java | 9 +++++---- .../api/common/typeutils/base/ListSerializerSnapshot.java | 7 ++++--- .../api/common/typeutils/base/MapSerializerSnapshot.java | 6 +++--- .../api/common/typeutils/base/TypeSerializerSingleton.java | 2 +- .../typeutils/runtime/JavaEitherSerializerSnapshot.java | 14 ++++++++++---- .../api/java/typeutils/runtime/NullableSerializer.java | 8 ++------ .../java/typeutils/runtime/TupleSerializerSnapshot.java | 9 ++------- .../nfa/sharedbuffer/LockableTypeSerializerSnapshot.java | 6 +++--- .../flink/runtime/state/ArrayListSerializerSnapshot.java | 6 +++--- .../apache/flink/runtime/state/ttl/TtlStateFactory.java | 7 +------ .../scala/typeutils/ScalaCaseClassSerializerSnapshot.java | 9 ++------- .../api/scala/typeutils/ScalaEitherSerializerSnapshot.java | 6 +++--- .../api/scala/typeutils/ScalaOptionSerializerSnapshot.java | 7 +------ .../api/scala/typeutils/ScalaTrySerializerSnapshot.java | 7 +------ .../api/scala/typeutils/TraversableSerializerSnapshot.java | 9 ++------- .../scala/typeutils/Tuple2CaseClassSerializerSnapshot.java | 5 ++--- .../flink/streaming/api/datastream/CoGroupedStreams.java | 7 +------ .../api/functions/sink/TwoPhaseCommitSinkFunction.java | 7 +------ .../streaming/api/operators/TimerSerializerSnapshot.java | 9 ++------- .../streaming/api/operators/co/IntervalJoinOperator.java | 7 +------ .../runtime/streamrecord/StreamElementSerializer.java | 8 +------- .../flink/table/dataview/ListViewSerializerSnapshot.java | 6 +++--- .../flink/table/dataview/MapViewSerializerSnapshot.java | 6 +++--- 24 files changed, 61 insertions(+), 113 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java index 0e8b2f3..07c243d 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java @@ -80,7 +80,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * @param <S> The type of the originating serializer. */ @PublicEvolving -public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerializer> implements TypeSerializerSnapshot<T> { +public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerializer<T>> implements TypeSerializerSnapshot<T> { /** Magic number for integrity checks during deserialization. */ private static final int MAGIC_NUMBER = 911108; @@ -111,8 +111,9 @@ public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerialize * * @param correspondingSerializerClass the expected class of the new serializer. */ - public CompositeTypeSerializerSnapshot(Class<S> correspondingSerializerClass) { - this.correspondingSerializerClass = checkNotNull(correspondingSerializerClass); + @SuppressWarnings("unchecked") + public CompositeTypeSerializerSnapshot(Class<? extends TypeSerializer> correspondingSerializerClass) { + this.correspondingSerializerClass = (Class<S>) checkNotNull(correspondingSerializerClass); } /** diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java index bb99ed5..f517ec8 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java @@ -31,7 +31,7 @@ import java.io.IOException; * * @param <C> The component type. */ -public final class GenericArraySerializerSnapshot<C> extends CompositeTypeSerializerSnapshot<C[], GenericArraySerializer> { +public final class GenericArraySerializerSnapshot<C> extends CompositeTypeSerializerSnapshot<C[], GenericArraySerializer<C>> { private static final int CURRENT_VERSION = 1; @@ -56,6 +56,7 @@ public final class GenericArraySerializerSnapshot<C> extends CompositeTypeSerial * Constructor that the legacy {@link GenericArraySerializerConfigSnapshot} uses * to delegate compatibility checks to this class. */ + @SuppressWarnings("deprecation") GenericArraySerializerSnapshot(Class<C> componentClass) { super(GenericArraySerializer.class); this.componentClass = componentClass; @@ -77,19 +78,19 @@ public final class GenericArraySerializerSnapshot<C> extends CompositeTypeSerial } @Override - protected boolean isOuterSnapshotCompatible(GenericArraySerializer newSerializer) { + protected boolean isOuterSnapshotCompatible(GenericArraySerializer<C> newSerializer) { return this.componentClass == newSerializer.getComponentClass(); } @Override - protected GenericArraySerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) { + protected GenericArraySerializer<C> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) { @SuppressWarnings("unchecked") TypeSerializer<C> componentSerializer = (TypeSerializer<C>) nestedSerializers[0]; return new GenericArraySerializer<>(componentClass, componentSerializer); } @Override - protected TypeSerializer<?>[] getNestedSerializers(GenericArraySerializer outerSerializer) { + protected TypeSerializer<?>[] getNestedSerializers(GenericArraySerializer<C> outerSerializer) { return new TypeSerializer<?>[] { outerSerializer.getComponentSerializer() }; } } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshot.java index f90e22a..59ae8ed 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshot.java @@ -20,12 +20,13 @@ package org.apache.flink.api.common.typeutils.base; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.TypeSerializer; + import java.util.List; /** * Snapshot class for the {@link ListSerializer}. */ -public class ListSerializerSnapshot<T> extends CompositeTypeSerializerSnapshot<List<T>, ListSerializer> { +public class ListSerializerSnapshot<T> extends CompositeTypeSerializerSnapshot<List<T>, ListSerializer<T>> { private static final int CURRENT_VERSION = 1; @@ -49,14 +50,14 @@ public class ListSerializerSnapshot<T> extends CompositeTypeSerializerSnapshot<L } @Override - protected ListSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) { + protected ListSerializer<T> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) { @SuppressWarnings("unchecked") TypeSerializer<T> elementSerializer = (TypeSerializer<T>) nestedSerializers[0]; return new ListSerializer<>(elementSerializer); } @Override - protected TypeSerializer<?>[] getNestedSerializers(ListSerializer outerSerializer) { + protected TypeSerializer<?>[] getNestedSerializers(ListSerializer<T> outerSerializer) { return new TypeSerializer<?>[] { outerSerializer.getElementSerializer() }; } } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java index a6db0ef..3699251 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java @@ -26,7 +26,7 @@ import java.util.Map; /** * Snapshot class for the {@link MapSerializer}. */ -public class MapSerializerSnapshot<K, V> extends CompositeTypeSerializerSnapshot<Map<K, V>, MapSerializer> { +public class MapSerializerSnapshot<K, V> extends CompositeTypeSerializerSnapshot<Map<K, V>, MapSerializer<K, V>> { private static final int CURRENT_VERSION = 1; @@ -50,7 +50,7 @@ public class MapSerializerSnapshot<K, V> extends CompositeTypeSerializerSnapshot } @Override - protected MapSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) { + protected MapSerializer<K, V> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) { @SuppressWarnings("unchecked") TypeSerializer<K> keySerializer = (TypeSerializer<K>) nestedSerializers[0]; @@ -61,7 +61,7 @@ public class MapSerializerSnapshot<K, V> extends CompositeTypeSerializerSnapshot } @Override - protected TypeSerializer<?>[] getNestedSerializers(MapSerializer outerSerializer) { + protected TypeSerializer<?>[] getNestedSerializers(MapSerializer<K, V> outerSerializer) { return new TypeSerializer<?>[] { outerSerializer.getKeySerializer(), outerSerializer.getValueSerializer() }; } } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java index 5eb3a00..0594ef7 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java @@ -25,7 +25,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; @Internal -public abstract class TypeSerializerSingleton<T> extends TypeSerializer<T>{ +public abstract class TypeSerializerSingleton<T> extends TypeSerializer<T> { private static final long serialVersionUID = 8766687317209282373L; diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java index 5036345..e0d4bd5 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java @@ -25,7 +25,7 @@ import org.apache.flink.types.Either; /** * Snapshot class for the {@link EitherSerializer}. */ -public class JavaEitherSerializerSnapshot<L, R> extends CompositeTypeSerializerSnapshot<Either<L, R>, EitherSerializer> { +public class JavaEitherSerializerSnapshot<L, R> extends CompositeTypeSerializerSnapshot<Either<L, R>, EitherSerializer<L, R>> { private static final int CURRENT_VERSION = 1; @@ -50,12 +50,18 @@ public class JavaEitherSerializerSnapshot<L, R> extends CompositeTypeSerializerS } @Override - protected EitherSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) { - return new EitherSerializer<>(nestedSerializers[0], nestedSerializers[1]); + protected EitherSerializer<L, R> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) { + @SuppressWarnings("unchecked") + TypeSerializer<L> leftSerializer = (TypeSerializer<L>) nestedSerializers[0]; + + @SuppressWarnings("unchecked") + TypeSerializer<R> rightSerializer = (TypeSerializer<R>) nestedSerializers[1]; + + return new EitherSerializer<>(leftSerializer, rightSerializer); } @Override - protected TypeSerializer<?>[] getNestedSerializers(EitherSerializer outerSerializer) { + protected TypeSerializer<?>[] getNestedSerializers(EitherSerializer<L, R> outerSerializer) { return new TypeSerializer<?>[]{ outerSerializer.getLeftSerializer(), outerSerializer.getRightSerializer() }; } } diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java index aa8dc32..282a12f 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java @@ -299,7 +299,7 @@ public class NullableSerializer<T> extends TypeSerializer<T> { @SuppressWarnings("unused") public NullableSerializerSnapshot() { - super(serializerClass()); + super(NullableSerializer.class); } public NullableSerializerSnapshot(NullableSerializer<T> serializerInstance) { @@ -308,7 +308,7 @@ public class NullableSerializer<T> extends TypeSerializer<T> { } private NullableSerializerSnapshot(int nullPaddingLength) { - super(serializerClass()); + super(NullableSerializer.class); checkArgument(nullPaddingLength >= 0, "Computed NULL padding can not be negative. %d", nullPaddingLength); @@ -351,10 +351,6 @@ public class NullableSerializer<T> extends TypeSerializer<T> { protected boolean isOuterSnapshotCompatible(NullableSerializer<T> newSerializer) { return nullPaddingLength == newSerializer.nullPaddingLength(); } - - private static <T> Class<NullableSerializer<T>> serializerClass() { - return (Class<NullableSerializer<T>>) (Class<?>) NullableSerializer.class; - } } } diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerSnapshot.java index af437fc..c016f95 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerSnapshot.java @@ -44,7 +44,7 @@ public final class TupleSerializerSnapshot<T extends Tuple> @SuppressWarnings("unused") public TupleSerializerSnapshot() { - super(correspondingSerializerClass()); + super(TupleSerializer.class); } TupleSerializerSnapshot(TupleSerializer<T> serializerInstance) { @@ -57,7 +57,7 @@ public final class TupleSerializerSnapshot<T extends Tuple> * {@link TupleSerializer#resolveSchemaCompatibilityViaRedirectingToNewSnapshotClass}. */ TupleSerializerSnapshot(Class<T> tupleClass) { - super(correspondingSerializerClass()); + super(TupleSerializer.class); this.tupleClass = checkNotNull(tupleClass, "tuple class can not be NULL"); } @@ -87,9 +87,4 @@ public final class TupleSerializerSnapshot<T extends Tuple> protected void readOuterSnapshot(int readOuterSnapshotVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException { this.tupleClass = InstantiationUtil.resolveClassByName(in, userCodeClassLoader); } - - @SuppressWarnings("unchecked") - private static <T extends Tuple> Class<TupleSerializer<T>> correspondingSerializerClass() { - return (Class<TupleSerializer<T>>) (Class<?>) TupleSerializer.class; - } } diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshot.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshot.java index 13867ac..74cc1bc 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshot.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshot.java @@ -27,7 +27,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; * A {@link TypeSerializerSnapshot} for the {@link Lockable.LockableTypeSerializer}. */ @Internal -public class LockableTypeSerializerSnapshot<E> extends CompositeTypeSerializerSnapshot<Lockable<E>, Lockable.LockableTypeSerializer> { +public class LockableTypeSerializerSnapshot<E> extends CompositeTypeSerializerSnapshot<Lockable<E>, Lockable.LockableTypeSerializer<E>> { private static final int CURRENT_VERSION = 1; @@ -51,14 +51,14 @@ public class LockableTypeSerializerSnapshot<E> extends CompositeTypeSerializerSn } @Override - protected Lockable.LockableTypeSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) { + protected Lockable.LockableTypeSerializer<E> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) { @SuppressWarnings("unchecked") TypeSerializer<E> elementSerializer = (TypeSerializer<E>) nestedSerializers[0]; return new Lockable.LockableTypeSerializer<>(elementSerializer); } @Override - protected TypeSerializer<?>[] getNestedSerializers(Lockable.LockableTypeSerializer outerSerializer) { + protected TypeSerializer<?>[] getNestedSerializers(Lockable.LockableTypeSerializer<E> outerSerializer) { return new TypeSerializer<?>[] { outerSerializer.getElementSerializer() }; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializerSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializerSnapshot.java index dde8d1a..3aa4d8a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializerSnapshot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializerSnapshot.java @@ -26,7 +26,7 @@ import java.util.ArrayList; /** * Snapshot class for the {@link ArrayListSerializer}. */ -public class ArrayListSerializerSnapshot<T> extends CompositeTypeSerializerSnapshot<ArrayList<T>, ArrayListSerializer> { +public class ArrayListSerializerSnapshot<T> extends CompositeTypeSerializerSnapshot<ArrayList<T>, ArrayListSerializer<T>> { private static final int CURRENT_VERSION = 1; @@ -50,14 +50,14 @@ public class ArrayListSerializerSnapshot<T> extends CompositeTypeSerializerSnaps } @Override - protected ArrayListSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) { + protected ArrayListSerializer<T> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) { @SuppressWarnings("unchecked") TypeSerializer<T> elementSerializer = (TypeSerializer<T>) nestedSerializers[0]; return new ArrayListSerializer<>(elementSerializer); } @Override - protected TypeSerializer<?>[] getNestedSerializers(ArrayListSerializer outerSerializer) { + protected TypeSerializer<?>[] getNestedSerializers(ArrayListSerializer<T> outerSerializer) { return new TypeSerializer<?>[] { outerSerializer.getElementSerializer() }; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java index 933b829..37f7d79 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java @@ -312,7 +312,7 @@ public class TtlStateFactory<K, N, SV, TTLSV, S extends State, IS extends S> { @SuppressWarnings({"WeakerAccess", "unused"}) public TtlSerializerSnapshot() { - super(correspondingSerializerClass()); + super(TtlSerializer.class); } TtlSerializerSnapshot(TtlSerializer<T> serializerInstance) { @@ -337,10 +337,5 @@ public class TtlStateFactory<K, N, SV, TTLSV, S extends State, IS extends S> { return new TtlSerializer<>(timestampSerializer, valueSerializer); } - - @SuppressWarnings("unchecked") - private static <T> Class<TtlSerializer<T>> correspondingSerializerClass() { - return (Class<TtlSerializer<T>>) (Class<?>) TtlSerializer.class; - } } } diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializerSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializerSnapshot.java index e26e38e..886a59c 100644 --- a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializerSnapshot.java +++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializerSnapshot.java @@ -49,7 +49,7 @@ public final class ScalaCaseClassSerializerSnapshot<T extends scala.Product> */ @SuppressWarnings("unused") public ScalaCaseClassSerializerSnapshot() { - super(correspondingSerializerClass()); + super(ScalaCaseClassSerializer.class); } /** @@ -63,7 +63,7 @@ public final class ScalaCaseClassSerializerSnapshot<T extends scala.Product> */ @Internal ScalaCaseClassSerializerSnapshot(Class<T> type) { - super(correspondingSerializerClass()); + super(ScalaCaseClassSerializer.class); this.type = checkNotNull(type, "type can not be NULL"); } @@ -106,9 +106,4 @@ public final class ScalaCaseClassSerializerSnapshot<T extends scala.Product> protected boolean isOuterSnapshotCompatible(ScalaCaseClassSerializer<T> newSerializer) { return Objects.equals(type, newSerializer.getTupleClass()); } - - @SuppressWarnings("unchecked") - private static <T extends scala.Product> Class<ScalaCaseClassSerializer<T>> correspondingSerializerClass() { - return (Class<ScalaCaseClassSerializer<T>>) (Class<?>) ScalaCaseClassSerializer.class; - } } diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshot.java index 26cfef5..eecacfd 100644 --- a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshot.java +++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshot.java @@ -27,7 +27,7 @@ import scala.util.Either; * Configuration snapshot for serializers of Scala's {@link Either} type, * containing configuration snapshots of the Left and Right serializers. */ -public class ScalaEitherSerializerSnapshot<L, R> extends CompositeTypeSerializerSnapshot<Either<L, R>, EitherSerializer> { +public class ScalaEitherSerializerSnapshot<L, R> extends CompositeTypeSerializerSnapshot<Either<L, R>, EitherSerializer<L, R>> { private static final int CURRENT_VERSION = 1; @@ -51,7 +51,7 @@ public class ScalaEitherSerializerSnapshot<L, R> extends CompositeTypeSerializer } @Override - protected EitherSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) { + protected EitherSerializer<L, R> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) { @SuppressWarnings("unchecked") TypeSerializer<L> leftSerializer = (TypeSerializer<L>) nestedSerializers[0]; @@ -62,7 +62,7 @@ public class ScalaEitherSerializerSnapshot<L, R> extends CompositeTypeSerializer } @Override - protected TypeSerializer<?>[] getNestedSerializers(EitherSerializer outerSerializer) { + protected TypeSerializer<?>[] getNestedSerializers(EitherSerializer<L, R> outerSerializer) { return new TypeSerializer<?>[] { outerSerializer.getLeftSerializer(), outerSerializer.getRightSerializer() }; } } diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java index 47a72a2..d5e1def 100644 --- a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java +++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java @@ -33,7 +33,7 @@ public final class ScalaOptionSerializerSnapshot<E> extends CompositeTypeSeriali @SuppressWarnings("WeakerAccess") public ScalaOptionSerializerSnapshot() { - super(underlyingClass()); + super(OptionSerializer.class); } public ScalaOptionSerializerSnapshot(OptionSerializer<E> serializerInstance) { @@ -55,9 +55,4 @@ public final class ScalaOptionSerializerSnapshot<E> extends CompositeTypeSeriali @SuppressWarnings("unchecked") TypeSerializer<E> nestedSerializer = (TypeSerializer<E>) nestedSerializers[0]; return new OptionSerializer<>(nestedSerializer); } - - @SuppressWarnings("unchecked") - private static <E> Class<OptionSerializer<E>> underlyingClass() { - return (Class<OptionSerializer<E>>) (Class<?>) OptionSerializer.class; - } } diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaTrySerializerSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaTrySerializerSnapshot.java index e104838..d5fa813 100644 --- a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaTrySerializerSnapshot.java +++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaTrySerializerSnapshot.java @@ -38,7 +38,7 @@ public class ScalaTrySerializerSnapshot<E> extends CompositeTypeSerializerSnapsh /** This empty nullary constructor is required for deserializing the configuration. */ @SuppressWarnings("unused") public ScalaTrySerializerSnapshot() { - super(correspondingSerializerClass()); + super(TrySerializer.class); } public ScalaTrySerializerSnapshot(TrySerializer<E> trySerializer) { @@ -63,9 +63,4 @@ public class ScalaTrySerializerSnapshot<E> extends CompositeTypeSerializerSnapsh return new TrySerializer<>(elementSerializer, valueSerializer); } - - @SuppressWarnings("unchecked") - private static <E> Class<TrySerializer<E>> correspondingSerializerClass() { - return (Class<TrySerializer<E>>) (Class<?>) TrySerializer.class; - } } diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/TraversableSerializerSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/TraversableSerializerSnapshot.java index a29287c..4cabceb 100644 --- a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/TraversableSerializerSnapshot.java +++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/TraversableSerializerSnapshot.java @@ -47,7 +47,7 @@ public class TraversableSerializerSnapshot<T extends TraversableOnce<E>, E> @SuppressWarnings("unused") public TraversableSerializerSnapshot() { - super(serializerClass()); + super(TraversableSerializer.class); } public TraversableSerializerSnapshot(TraversableSerializer<T, E> serializerInstance) { @@ -56,7 +56,7 @@ public class TraversableSerializerSnapshot<T extends TraversableOnce<E>, E> } TraversableSerializerSnapshot(String cbfCode) { - super(serializerClass()); + super(TraversableSerializer.class); checkArgument(cbfCode != null, "cbfCode cannot be null"); this.cbfCode = cbfCode; @@ -101,9 +101,4 @@ public class TraversableSerializerSnapshot<T extends TraversableOnce<E>, E> protected boolean isOuterSnapshotCompatible(TraversableSerializer<T, E> newSerializer) { return cbfCode.equals(newSerializer.cbfCode()); } - - @SuppressWarnings({"unchecked"}) - private static <T extends TraversableOnce<E>, E> Class<TraversableSerializer<T, E>> serializerClass() { - return (Class<TraversableSerializer<T, E>>) (Class<?>) TraversableSerializer.class; - } } diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/Tuple2CaseClassSerializerSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/Tuple2CaseClassSerializerSnapshot.java index fb3e770..9459181 100644 --- a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/Tuple2CaseClassSerializerSnapshot.java +++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/Tuple2CaseClassSerializerSnapshot.java @@ -99,8 +99,7 @@ public final class Tuple2CaseClassSerializerSnapshot<T1, T2> return Objects.equals(type, newSerializer.getTupleClass()); } - @SuppressWarnings("unchecked") - private static <T1, T2> Class<ScalaCaseClassSerializer<Tuple2<T1, T2>>> correspondingSerializerClass() { - return (Class<ScalaCaseClassSerializer<Tuple2<T1, T2>>>) (Class<?>) package$.MODULE$.tuple2ClassForJava(); + private static <T1, T2> Class<ScalaCaseClassSerializer<scala.Tuple2<T1, T2>>> correspondingSerializerClass() { + return package$.MODULE$.tuple2ClassForJava(); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java index 9178a82..8183cc6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java @@ -669,7 +669,7 @@ public class CoGroupedStreams<T1, T2> { @SuppressWarnings("WeakerAccess") public UnionSerializerSnapshot() { - super(correspondingSerializerClass()); + super(UnionSerializer.class); } UnionSerializerSnapshot(UnionSerializer<T1, T2> serializerInstance) { @@ -691,11 +691,6 @@ public class CoGroupedStreams<T1, T2> { protected UnionSerializer<T1, T2> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) { return new UnionSerializer<>((TypeSerializer<T1>) nestedSerializers[0], (TypeSerializer<T2>) nestedSerializers[1]); } - - @SuppressWarnings("unchecked") - private static <T1, T2> Class<UnionSerializer<T1, T2>> correspondingSerializerClass() { - return (Class<UnionSerializer<T1, T2>>) (Class<?>) UnionSerializer.class; - } } // ------------------------------------------------------------------------ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java index ce3bd0f..588592e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java @@ -844,7 +844,7 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> @SuppressWarnings("WeakerAccess") public StateSerializerSnapshot() { - super(correspondingSerializerClass()); + super(StateSerializer.class); } StateSerializerSnapshot(StateSerializer<TXN, CONTEXT> serializerInstance) { @@ -871,10 +871,5 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> protected TypeSerializer<?>[] getNestedSerializers(StateSerializer<TXN, CONTEXT> outerSerializer) { return new TypeSerializer<?>[] { outerSerializer.transactionSerializer, outerSerializer.contextSerializer }; } - - @SuppressWarnings("unchecked") - private static <TXN, CONTEXT> Class<StateSerializer<TXN, CONTEXT>> correspondingSerializerClass() { - return (Class<StateSerializer<TXN, CONTEXT>>) (Class<?>) StateSerializer.class; - } } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializerSnapshot.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializerSnapshot.java index c28b045..7da3b3f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializerSnapshot.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializerSnapshot.java @@ -31,7 +31,7 @@ public class TimerSerializerSnapshot<K, N> extends CompositeTypeSerializerSnapsh private static final int VERSION = 2; public TimerSerializerSnapshot() { - super(correspondingSerializerClass()); + super(TimerSerializer.class); } public TimerSerializerSnapshot(TimerSerializer<K, N> timerSerializer) { @@ -51,16 +51,11 @@ public class TimerSerializerSnapshot<K, N> extends CompositeTypeSerializerSnapsh @SuppressWarnings("unchecked") final TypeSerializer<N> namespaceSerializer = (TypeSerializer<N>) nestedSerializers[1]; - return new TimerSerializer<K, N>(keySerializer, namespaceSerializer); + return new TimerSerializer<>(keySerializer, namespaceSerializer); } @Override protected TypeSerializer<?>[] getNestedSerializers(TimerSerializer<K, N> outerSerializer) { return new TypeSerializer<?>[] { outerSerializer.getKeySerializer(), outerSerializer.getNamespaceSerializer() }; } - - @SuppressWarnings("unchecked") - private static <K, N> Class<TimerSerializer<K, N>> correspondingSerializerClass() { - return (Class<TimerSerializer<K, N>>) (Class<?>) TimerSerializer.class; - } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java index 84629e4..3c99022 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java @@ -509,7 +509,7 @@ public class IntervalJoinOperator<K, T1, T2, OUT> @SuppressWarnings({"unused", "WeakerAccess"}) public BufferEntrySerializerSnapshot() { - super(correspondingSerializerClass()); + super(BufferEntrySerializer.class); } BufferEntrySerializerSnapshot(BufferEntrySerializer<T> serializerInstance) { @@ -531,11 +531,6 @@ public class IntervalJoinOperator<K, T1, T2, OUT> protected BufferEntrySerializer<T> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) { return new BufferEntrySerializer<>((TypeSerializer<T>) nestedSerializers[0]); } - - @SuppressWarnings("unchecked") - private static <T> Class<BufferEntrySerializer<T>> correspondingSerializerClass() { - return (Class<BufferEntrySerializer<T>>) (Class<?>) BufferEntrySerializer.class; - } } @VisibleForTesting diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java index 10c2f68..3e7b1ec 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java @@ -314,7 +314,7 @@ public final class StreamElementSerializer<T> extends TypeSerializer<StreamEleme @SuppressWarnings("WeakerAccess") public StreamElementSerializerSnapshot() { - super(serializerClass()); + super(StreamElementSerializer.class); } StreamElementSerializerSnapshot(StreamElementSerializer<T> serializerInstance) { @@ -338,11 +338,5 @@ public final class StreamElementSerializer<T> extends TypeSerializer<StreamEleme return new StreamElementSerializer<>(casted); } - - @SuppressWarnings("unchecked") - private static <T> Class<StreamElementSerializer<T>> serializerClass() { - Class<?> streamElementSerializerClass = StreamElementSerializer.class; - return (Class<StreamElementSerializer<T>>) streamElementSerializerClass; - } } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java index 90468ac..a130271 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java @@ -30,7 +30,7 @@ import java.util.List; * * @param <T> the type of the list elements. */ -public final class ListViewSerializerSnapshot<T> extends CompositeTypeSerializerSnapshot<ListView<T>, ListViewSerializer> { +public final class ListViewSerializerSnapshot<T> extends CompositeTypeSerializerSnapshot<ListView<T>, ListViewSerializer<T>> { private static final int CURRENT_VERSION = 1; @@ -54,14 +54,14 @@ public final class ListViewSerializerSnapshot<T> extends CompositeTypeSerializer } @Override - protected ListViewSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) { + protected ListViewSerializer<T> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) { @SuppressWarnings("unchecked") TypeSerializer<List<T>> listSerializer = (TypeSerializer<List<T>>) nestedSerializers[0]; return new ListViewSerializer<>(listSerializer); } @Override - protected TypeSerializer<?>[] getNestedSerializers(ListViewSerializer outerSerializer) { + protected TypeSerializer<?>[] getNestedSerializers(ListViewSerializer<T> outerSerializer) { return new TypeSerializer<?>[] { outerSerializer.getListSerializer() }; } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java index 132f42f..618cf5c 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java @@ -31,7 +31,7 @@ import java.util.Map; * @param <K> the key type of the map entries. * @param <V> the value type of the map entries. */ -public class MapViewSerializerSnapshot<K, V> extends CompositeTypeSerializerSnapshot<MapView<K, V>, MapViewSerializer> { +public class MapViewSerializerSnapshot<K, V> extends CompositeTypeSerializerSnapshot<MapView<K, V>, MapViewSerializer<K, V>> { private static final int CURRENT_VERSION = 1; @@ -55,14 +55,14 @@ public class MapViewSerializerSnapshot<K, V> extends CompositeTypeSerializerSnap } @Override - protected MapViewSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) { + protected MapViewSerializer<K, V> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) { @SuppressWarnings("unchecked") TypeSerializer<Map<K, V>> mapSerializer = (TypeSerializer<Map<K, V>>) nestedSerializers[0]; return new MapViewSerializer<>(mapSerializer); } @Override - protected TypeSerializer<?>[] getNestedSerializers(MapViewSerializer outerSerializer) { + protected TypeSerializer<?>[] getNestedSerializers(MapViewSerializer<K, V> outerSerializer) { return new TypeSerializer<?>[] { outerSerializer.getMapSerializer() }; } }
