This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 19a32a1865ad257b8a14111f8f2787e865c8f34b
Author: Tzu-Li (Gordon) Tai <[email protected]>
AuthorDate: Mon Feb 25 15:36:47 2019 +0800

    [FLINK-11740] [core] Clarify CompositeTypeSerializerSnapshot class signature
    
    Prior to this commit, the CompositeTypeSerializerSnapshot class
    signature was a bit confusing and contained raw types. Moreover, it
    required subclasses to always erase types and re-cast.
    
    This closes #7818.
---
 .../common/typeutils/CompositeTypeSerializerSnapshot.java  |  7 ++++---
 .../typeutils/base/GenericArraySerializerSnapshot.java     |  9 +++++----
 .../api/common/typeutils/base/ListSerializerSnapshot.java  |  7 ++++---
 .../api/common/typeutils/base/MapSerializerSnapshot.java   |  6 +++---
 .../api/common/typeutils/base/TypeSerializerSingleton.java |  2 +-
 .../typeutils/runtime/JavaEitherSerializerSnapshot.java    | 14 ++++++++++----
 .../api/java/typeutils/runtime/NullableSerializer.java     |  8 ++------
 .../java/typeutils/runtime/TupleSerializerSnapshot.java    |  9 ++-------
 .../nfa/sharedbuffer/LockableTypeSerializerSnapshot.java   |  6 +++---
 .../flink/runtime/state/ArrayListSerializerSnapshot.java   |  6 +++---
 .../apache/flink/runtime/state/ttl/TtlStateFactory.java    |  7 +------
 .../scala/typeutils/ScalaCaseClassSerializerSnapshot.java  |  9 ++-------
 .../api/scala/typeutils/ScalaEitherSerializerSnapshot.java |  6 +++---
 .../api/scala/typeutils/ScalaOptionSerializerSnapshot.java |  7 +------
 .../api/scala/typeutils/ScalaTrySerializerSnapshot.java    |  7 +------
 .../api/scala/typeutils/TraversableSerializerSnapshot.java |  9 ++-------
 .../scala/typeutils/Tuple2CaseClassSerializerSnapshot.java |  5 ++---
 .../flink/streaming/api/datastream/CoGroupedStreams.java   |  7 +------
 .../api/functions/sink/TwoPhaseCommitSinkFunction.java     |  7 +------
 .../streaming/api/operators/TimerSerializerSnapshot.java   |  9 ++-------
 .../streaming/api/operators/co/IntervalJoinOperator.java   |  7 +------
 .../runtime/streamrecord/StreamElementSerializer.java      |  8 +-------
 .../flink/table/dataview/ListViewSerializerSnapshot.java   |  6 +++---
 .../flink/table/dataview/MapViewSerializerSnapshot.java    |  6 +++---
 24 files changed, 61 insertions(+), 113 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
index 0e8b2f3..07c243d 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
@@ -80,7 +80,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * @param <S> The type of the originating serializer.
  */
 @PublicEvolving
-public abstract class CompositeTypeSerializerSnapshot<T, S extends 
TypeSerializer> implements TypeSerializerSnapshot<T> {
+public abstract class CompositeTypeSerializerSnapshot<T, S extends 
TypeSerializer<T>> implements TypeSerializerSnapshot<T> {
 
        /** Magic number for integrity checks during deserialization. */
        private static final int MAGIC_NUMBER = 911108;
@@ -111,8 +111,9 @@ public abstract class CompositeTypeSerializerSnapshot<T, S 
extends TypeSerialize
         *
         * @param correspondingSerializerClass the expected class of the new 
serializer.
         */
-       public CompositeTypeSerializerSnapshot(Class<S> 
correspondingSerializerClass) {
-               this.correspondingSerializerClass = 
checkNotNull(correspondingSerializerClass);
+       @SuppressWarnings("unchecked")
+       public CompositeTypeSerializerSnapshot(Class<? extends TypeSerializer> 
correspondingSerializerClass) {
+               this.correspondingSerializerClass = (Class<S>) 
checkNotNull(correspondingSerializerClass);
        }
 
        /**
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
index bb99ed5..f517ec8 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
@@ -31,7 +31,7 @@ import java.io.IOException;
  *
  * @param <C> The component type.
  */
-public final class GenericArraySerializerSnapshot<C> extends 
CompositeTypeSerializerSnapshot<C[], GenericArraySerializer> {
+public final class GenericArraySerializerSnapshot<C> extends 
CompositeTypeSerializerSnapshot<C[], GenericArraySerializer<C>> {
 
        private static final int CURRENT_VERSION = 1;
 
@@ -56,6 +56,7 @@ public final class GenericArraySerializerSnapshot<C> extends 
CompositeTypeSerial
         * Constructor that the legacy {@link 
GenericArraySerializerConfigSnapshot} uses
         * to delegate compatibility checks to this class.
         */
+       @SuppressWarnings("deprecation")
        GenericArraySerializerSnapshot(Class<C> componentClass) {
                super(GenericArraySerializer.class);
                this.componentClass = componentClass;
@@ -77,19 +78,19 @@ public final class GenericArraySerializerSnapshot<C> 
extends CompositeTypeSerial
        }
 
        @Override
-       protected boolean isOuterSnapshotCompatible(GenericArraySerializer 
newSerializer) {
+       protected boolean isOuterSnapshotCompatible(GenericArraySerializer<C> 
newSerializer) {
                return this.componentClass == newSerializer.getComponentClass();
        }
 
        @Override
-       protected GenericArraySerializer 
createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] 
nestedSerializers) {
+       protected GenericArraySerializer<C> 
createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] 
nestedSerializers) {
                @SuppressWarnings("unchecked")
                TypeSerializer<C> componentSerializer = (TypeSerializer<C>) 
nestedSerializers[0];
                return new GenericArraySerializer<>(componentClass, 
componentSerializer);
        }
 
        @Override
-       protected TypeSerializer<?>[] 
getNestedSerializers(GenericArraySerializer outerSerializer) {
+       protected TypeSerializer<?>[] 
getNestedSerializers(GenericArraySerializer<C> outerSerializer) {
                return new TypeSerializer<?>[] { 
outerSerializer.getComponentSerializer() };
        }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshot.java
index f90e22a..59ae8ed 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshot.java
@@ -20,12 +20,13 @@ package org.apache.flink.api.common.typeutils.base;
 
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+
 import java.util.List;
 
 /**
  * Snapshot class for the {@link ListSerializer}.
  */
-public class ListSerializerSnapshot<T> extends 
CompositeTypeSerializerSnapshot<List<T>, ListSerializer> {
+public class ListSerializerSnapshot<T> extends 
CompositeTypeSerializerSnapshot<List<T>, ListSerializer<T>> {
 
        private static final int CURRENT_VERSION = 1;
 
@@ -49,14 +50,14 @@ public class ListSerializerSnapshot<T> extends 
CompositeTypeSerializerSnapshot<L
        }
 
        @Override
-       protected ListSerializer 
createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] 
nestedSerializers) {
+       protected ListSerializer<T> 
createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] 
nestedSerializers) {
                @SuppressWarnings("unchecked")
                TypeSerializer<T> elementSerializer = (TypeSerializer<T>) 
nestedSerializers[0];
                return new ListSerializer<>(elementSerializer);
        }
 
        @Override
-       protected TypeSerializer<?>[] getNestedSerializers(ListSerializer 
outerSerializer) {
+       protected TypeSerializer<?>[] getNestedSerializers(ListSerializer<T> 
outerSerializer) {
                return new TypeSerializer<?>[] { 
outerSerializer.getElementSerializer() };
        }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java
index a6db0ef..3699251 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java
@@ -26,7 +26,7 @@ import java.util.Map;
 /**
  * Snapshot class for the {@link MapSerializer}.
  */
-public class MapSerializerSnapshot<K, V> extends 
CompositeTypeSerializerSnapshot<Map<K, V>, MapSerializer> {
+public class MapSerializerSnapshot<K, V> extends 
CompositeTypeSerializerSnapshot<Map<K, V>, MapSerializer<K, V>> {
 
        private static final int CURRENT_VERSION = 1;
 
@@ -50,7 +50,7 @@ public class MapSerializerSnapshot<K, V> extends 
CompositeTypeSerializerSnapshot
        }
 
        @Override
-       protected MapSerializer 
createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] 
nestedSerializers) {
+       protected MapSerializer<K, V> 
createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] 
nestedSerializers) {
                @SuppressWarnings("unchecked")
                TypeSerializer<K> keySerializer = (TypeSerializer<K>) 
nestedSerializers[0];
 
@@ -61,7 +61,7 @@ public class MapSerializerSnapshot<K, V> extends 
CompositeTypeSerializerSnapshot
        }
 
        @Override
-       protected TypeSerializer<?>[] getNestedSerializers(MapSerializer 
outerSerializer) {
+       protected TypeSerializer<?>[] getNestedSerializers(MapSerializer<K, V> 
outerSerializer) {
                return new TypeSerializer<?>[] { 
outerSerializer.getKeySerializer(), outerSerializer.getValueSerializer() };
        }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
index 5eb3a00..0594ef7 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 
 @Internal
-public abstract class TypeSerializerSingleton<T> extends TypeSerializer<T>{
+public abstract class TypeSerializerSingleton<T> extends TypeSerializer<T> {
 
        private static final long serialVersionUID = 8766687317209282373L;
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java
index 5036345..e0d4bd5 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java
@@ -25,7 +25,7 @@ import org.apache.flink.types.Either;
 /**
  * Snapshot class for the {@link EitherSerializer}.
  */
-public class JavaEitherSerializerSnapshot<L, R> extends 
CompositeTypeSerializerSnapshot<Either<L, R>, EitherSerializer> {
+public class JavaEitherSerializerSnapshot<L, R> extends 
CompositeTypeSerializerSnapshot<Either<L, R>, EitherSerializer<L, R>> {
 
        private static final int CURRENT_VERSION = 1;
 
@@ -50,12 +50,18 @@ public class JavaEitherSerializerSnapshot<L, R> extends 
CompositeTypeSerializerS
        }
 
        @Override
-       protected EitherSerializer 
createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] 
nestedSerializers) {
-               return new EitherSerializer<>(nestedSerializers[0], 
nestedSerializers[1]);
+       protected EitherSerializer<L, R> 
createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] 
nestedSerializers) {
+               @SuppressWarnings("unchecked")
+               TypeSerializer<L> leftSerializer = (TypeSerializer<L>) 
nestedSerializers[0];
+
+               @SuppressWarnings("unchecked")
+               TypeSerializer<R> rightSerializer = (TypeSerializer<R>) 
nestedSerializers[1];
+
+               return new EitherSerializer<>(leftSerializer, rightSerializer);
        }
 
        @Override
-       protected TypeSerializer<?>[] getNestedSerializers(EitherSerializer 
outerSerializer) {
+       protected TypeSerializer<?>[] getNestedSerializers(EitherSerializer<L, 
R> outerSerializer) {
                return new TypeSerializer<?>[]{ 
outerSerializer.getLeftSerializer(), outerSerializer.getRightSerializer() };
        }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java
index aa8dc32..282a12f 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java
@@ -299,7 +299,7 @@ public class NullableSerializer<T> extends 
TypeSerializer<T> {
 
                @SuppressWarnings("unused")
                public NullableSerializerSnapshot() {
-                       super(serializerClass());
+                       super(NullableSerializer.class);
                }
 
                public NullableSerializerSnapshot(NullableSerializer<T> 
serializerInstance) {
@@ -308,7 +308,7 @@ public class NullableSerializer<T> extends 
TypeSerializer<T> {
                }
 
                private NullableSerializerSnapshot(int nullPaddingLength) {
-                       super(serializerClass());
+                       super(NullableSerializer.class);
                        checkArgument(nullPaddingLength >= 0,
                                "Computed NULL padding can not be negative. %d",
                                nullPaddingLength);
@@ -351,10 +351,6 @@ public class NullableSerializer<T> extends 
TypeSerializer<T> {
                protected boolean 
isOuterSnapshotCompatible(NullableSerializer<T> newSerializer) {
                        return nullPaddingLength == 
newSerializer.nullPaddingLength();
                }
-
-               private static <T> Class<NullableSerializer<T>> 
serializerClass() {
-                       return (Class<NullableSerializer<T>>) (Class<?>) 
NullableSerializer.class;
-               }
        }
 
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerSnapshot.java
index af437fc..c016f95 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerSnapshot.java
@@ -44,7 +44,7 @@ public final class TupleSerializerSnapshot<T extends Tuple>
 
        @SuppressWarnings("unused")
        public TupleSerializerSnapshot() {
-               super(correspondingSerializerClass());
+               super(TupleSerializer.class);
        }
 
        TupleSerializerSnapshot(TupleSerializer<T> serializerInstance) {
@@ -57,7 +57,7 @@ public final class TupleSerializerSnapshot<T extends Tuple>
         * {@link 
TupleSerializer#resolveSchemaCompatibilityViaRedirectingToNewSnapshotClass}.
         */
        TupleSerializerSnapshot(Class<T> tupleClass) {
-               super(correspondingSerializerClass());
+               super(TupleSerializer.class);
                this.tupleClass = checkNotNull(tupleClass, "tuple class can not 
be NULL");
        }
 
@@ -87,9 +87,4 @@ public final class TupleSerializerSnapshot<T extends Tuple>
        protected void readOuterSnapshot(int readOuterSnapshotVersion, 
DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
                this.tupleClass = InstantiationUtil.resolveClassByName(in, 
userCodeClassLoader);
        }
-
-       @SuppressWarnings("unchecked")
-       private static <T extends Tuple> Class<TupleSerializer<T>> 
correspondingSerializerClass() {
-               return (Class<TupleSerializer<T>>) (Class<?>) 
TupleSerializer.class;
-       }
 }
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshot.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshot.java
index 13867ac..74cc1bc 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshot.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshot.java
@@ -27,7 +27,7 @@ import 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
  * A {@link TypeSerializerSnapshot} for the {@link 
Lockable.LockableTypeSerializer}.
  */
 @Internal
-public class LockableTypeSerializerSnapshot<E> extends 
CompositeTypeSerializerSnapshot<Lockable<E>, Lockable.LockableTypeSerializer> {
+public class LockableTypeSerializerSnapshot<E> extends 
CompositeTypeSerializerSnapshot<Lockable<E>, 
Lockable.LockableTypeSerializer<E>> {
 
        private static final int CURRENT_VERSION = 1;
 
@@ -51,14 +51,14 @@ public class LockableTypeSerializerSnapshot<E> extends 
CompositeTypeSerializerSn
        }
 
        @Override
-       protected Lockable.LockableTypeSerializer 
createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] 
nestedSerializers) {
+       protected Lockable.LockableTypeSerializer<E> 
createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] 
nestedSerializers) {
                @SuppressWarnings("unchecked")
                TypeSerializer<E> elementSerializer = (TypeSerializer<E>) 
nestedSerializers[0];
                return new Lockable.LockableTypeSerializer<>(elementSerializer);
        }
 
        @Override
-       protected TypeSerializer<?>[] 
getNestedSerializers(Lockable.LockableTypeSerializer outerSerializer) {
+       protected TypeSerializer<?>[] 
getNestedSerializers(Lockable.LockableTypeSerializer<E> outerSerializer) {
                return new TypeSerializer<?>[] { 
outerSerializer.getElementSerializer() };
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializerSnapshot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializerSnapshot.java
index dde8d1a..3aa4d8a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializerSnapshot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializerSnapshot.java
@@ -26,7 +26,7 @@ import java.util.ArrayList;
 /**
  * Snapshot class for the {@link ArrayListSerializer}.
  */
-public class ArrayListSerializerSnapshot<T> extends 
CompositeTypeSerializerSnapshot<ArrayList<T>, ArrayListSerializer> {
+public class ArrayListSerializerSnapshot<T> extends 
CompositeTypeSerializerSnapshot<ArrayList<T>, ArrayListSerializer<T>> {
 
        private static final int CURRENT_VERSION = 1;
 
@@ -50,14 +50,14 @@ public class ArrayListSerializerSnapshot<T> extends 
CompositeTypeSerializerSnaps
        }
 
        @Override
-       protected ArrayListSerializer 
createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] 
nestedSerializers) {
+       protected ArrayListSerializer<T> 
createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] 
nestedSerializers) {
                @SuppressWarnings("unchecked")
                TypeSerializer<T> elementSerializer = (TypeSerializer<T>) 
nestedSerializers[0];
                return new ArrayListSerializer<>(elementSerializer);
        }
 
        @Override
-       protected TypeSerializer<?>[] getNestedSerializers(ArrayListSerializer 
outerSerializer) {
+       protected TypeSerializer<?>[] 
getNestedSerializers(ArrayListSerializer<T> outerSerializer) {
                return new TypeSerializer<?>[] { 
outerSerializer.getElementSerializer() };
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
index 933b829..37f7d79 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
@@ -312,7 +312,7 @@ public class TtlStateFactory<K, N, SV, TTLSV, S extends 
State, IS extends S> {
 
                @SuppressWarnings({"WeakerAccess", "unused"})
                public TtlSerializerSnapshot() {
-                       super(correspondingSerializerClass());
+                       super(TtlSerializer.class);
                }
 
                TtlSerializerSnapshot(TtlSerializer<T> serializerInstance) {
@@ -337,10 +337,5 @@ public class TtlStateFactory<K, N, SV, TTLSV, S extends 
State, IS extends S> {
 
                        return new TtlSerializer<>(timestampSerializer, 
valueSerializer);
                }
-
-               @SuppressWarnings("unchecked")
-               private static <T> Class<TtlSerializer<T>> 
correspondingSerializerClass() {
-                       return (Class<TtlSerializer<T>>) (Class<?>) 
TtlSerializer.class;
-               }
        }
 }
diff --git 
a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializerSnapshot.java
 
b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializerSnapshot.java
index e26e38e..886a59c 100644
--- 
a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializerSnapshot.java
+++ 
b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializerSnapshot.java
@@ -49,7 +49,7 @@ public final class ScalaCaseClassSerializerSnapshot<T extends 
scala.Product>
         */
        @SuppressWarnings("unused")
        public ScalaCaseClassSerializerSnapshot() {
-               super(correspondingSerializerClass());
+               super(ScalaCaseClassSerializer.class);
        }
 
        /**
@@ -63,7 +63,7 @@ public final class ScalaCaseClassSerializerSnapshot<T extends 
scala.Product>
         */
        @Internal
        ScalaCaseClassSerializerSnapshot(Class<T> type) {
-               super(correspondingSerializerClass());
+               super(ScalaCaseClassSerializer.class);
                this.type = checkNotNull(type, "type can not be NULL");
        }
 
@@ -106,9 +106,4 @@ public final class ScalaCaseClassSerializerSnapshot<T 
extends scala.Product>
        protected boolean isOuterSnapshotCompatible(ScalaCaseClassSerializer<T> 
newSerializer) {
                return Objects.equals(type, newSerializer.getTupleClass());
        }
-
-       @SuppressWarnings("unchecked")
-       private static <T extends scala.Product> 
Class<ScalaCaseClassSerializer<T>> correspondingSerializerClass() {
-               return (Class<ScalaCaseClassSerializer<T>>) (Class<?>) 
ScalaCaseClassSerializer.class;
-       }
 }
diff --git 
a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshot.java
 
b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshot.java
index 26cfef5..eecacfd 100644
--- 
a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshot.java
+++ 
b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshot.java
@@ -27,7 +27,7 @@ import scala.util.Either;
  * Configuration snapshot for serializers of Scala's {@link Either} type,
  * containing configuration snapshots of the Left and Right serializers.
  */
-public class ScalaEitherSerializerSnapshot<L, R> extends 
CompositeTypeSerializerSnapshot<Either<L, R>, EitherSerializer> {
+public class ScalaEitherSerializerSnapshot<L, R> extends 
CompositeTypeSerializerSnapshot<Either<L, R>, EitherSerializer<L, R>> {
 
        private static final int CURRENT_VERSION = 1;
 
@@ -51,7 +51,7 @@ public class ScalaEitherSerializerSnapshot<L, R> extends 
CompositeTypeSerializer
        }
 
        @Override
-       protected EitherSerializer 
createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] 
nestedSerializers) {
+       protected EitherSerializer<L, R> 
createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] 
nestedSerializers) {
                @SuppressWarnings("unchecked")
                TypeSerializer<L> leftSerializer = (TypeSerializer<L>) 
nestedSerializers[0];
 
@@ -62,7 +62,7 @@ public class ScalaEitherSerializerSnapshot<L, R> extends 
CompositeTypeSerializer
        }
 
        @Override
-       protected TypeSerializer<?>[] getNestedSerializers(EitherSerializer 
outerSerializer) {
+       protected TypeSerializer<?>[] getNestedSerializers(EitherSerializer<L, 
R> outerSerializer) {
                return new TypeSerializer<?>[] { 
outerSerializer.getLeftSerializer(), outerSerializer.getRightSerializer() };
        }
 }
diff --git 
a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java
 
b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java
index 47a72a2..d5e1def 100644
--- 
a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java
+++ 
b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java
@@ -33,7 +33,7 @@ public final class ScalaOptionSerializerSnapshot<E> extends 
CompositeTypeSeriali
 
        @SuppressWarnings("WeakerAccess")
        public ScalaOptionSerializerSnapshot() {
-               super(underlyingClass());
+               super(OptionSerializer.class);
        }
 
        public ScalaOptionSerializerSnapshot(OptionSerializer<E> 
serializerInstance) {
@@ -55,9 +55,4 @@ public final class ScalaOptionSerializerSnapshot<E> extends 
CompositeTypeSeriali
                @SuppressWarnings("unchecked") TypeSerializer<E> 
nestedSerializer = (TypeSerializer<E>) nestedSerializers[0];
                return new OptionSerializer<>(nestedSerializer);
        }
-
-       @SuppressWarnings("unchecked")
-       private static <E> Class<OptionSerializer<E>> underlyingClass() {
-               return (Class<OptionSerializer<E>>) (Class<?>) 
OptionSerializer.class;
-       }
 }
diff --git 
a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaTrySerializerSnapshot.java
 
b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaTrySerializerSnapshot.java
index e104838..d5fa813 100644
--- 
a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaTrySerializerSnapshot.java
+++ 
b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaTrySerializerSnapshot.java
@@ -38,7 +38,7 @@ public class ScalaTrySerializerSnapshot<E> extends 
CompositeTypeSerializerSnapsh
        /** This empty nullary constructor is required for deserializing the 
configuration. */
        @SuppressWarnings("unused")
        public ScalaTrySerializerSnapshot() {
-               super(correspondingSerializerClass());
+               super(TrySerializer.class);
        }
 
        public ScalaTrySerializerSnapshot(TrySerializer<E> trySerializer) {
@@ -63,9 +63,4 @@ public class ScalaTrySerializerSnapshot<E> extends 
CompositeTypeSerializerSnapsh
 
                return new TrySerializer<>(elementSerializer, valueSerializer);
        }
-
-       @SuppressWarnings("unchecked")
-       private static <E> Class<TrySerializer<E>> 
correspondingSerializerClass() {
-               return (Class<TrySerializer<E>>) (Class<?>) TrySerializer.class;
-       }
 }
diff --git 
a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/TraversableSerializerSnapshot.java
 
b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/TraversableSerializerSnapshot.java
index a29287c..4cabceb 100644
--- 
a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/TraversableSerializerSnapshot.java
+++ 
b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/TraversableSerializerSnapshot.java
@@ -47,7 +47,7 @@ public class TraversableSerializerSnapshot<T extends 
TraversableOnce<E>, E>
 
        @SuppressWarnings("unused")
        public TraversableSerializerSnapshot() {
-               super(serializerClass());
+               super(TraversableSerializer.class);
        }
 
        public TraversableSerializerSnapshot(TraversableSerializer<T, E> 
serializerInstance) {
@@ -56,7 +56,7 @@ public class TraversableSerializerSnapshot<T extends 
TraversableOnce<E>, E>
        }
 
        TraversableSerializerSnapshot(String cbfCode) {
-               super(serializerClass());
+               super(TraversableSerializer.class);
                checkArgument(cbfCode != null, "cbfCode cannot be null");
 
                this.cbfCode = cbfCode;
@@ -101,9 +101,4 @@ public class TraversableSerializerSnapshot<T extends 
TraversableOnce<E>, E>
        protected boolean isOuterSnapshotCompatible(TraversableSerializer<T, E> 
newSerializer) {
                return cbfCode.equals(newSerializer.cbfCode());
        }
-
-       @SuppressWarnings({"unchecked"})
-       private static <T extends TraversableOnce<E>, E> 
Class<TraversableSerializer<T, E>> serializerClass() {
-               return (Class<TraversableSerializer<T, E>>) (Class<?>) 
TraversableSerializer.class;
-       }
 }
diff --git 
a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/Tuple2CaseClassSerializerSnapshot.java
 
b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/Tuple2CaseClassSerializerSnapshot.java
index fb3e770..9459181 100644
--- 
a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/Tuple2CaseClassSerializerSnapshot.java
+++ 
b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/Tuple2CaseClassSerializerSnapshot.java
@@ -99,8 +99,7 @@ public final class Tuple2CaseClassSerializerSnapshot<T1, T2>
                return Objects.equals(type, newSerializer.getTupleClass());
        }
 
-       @SuppressWarnings("unchecked")
-       private static <T1, T2> Class<ScalaCaseClassSerializer<Tuple2<T1, T2>>> 
correspondingSerializerClass() {
-               return (Class<ScalaCaseClassSerializer<Tuple2<T1, T2>>>) 
(Class<?>) package$.MODULE$.tuple2ClassForJava();
+       private static <T1, T2> Class<ScalaCaseClassSerializer<scala.Tuple2<T1, 
T2>>> correspondingSerializerClass() {
+               return package$.MODULE$.tuple2ClassForJava();
        }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
index 9178a82..8183cc6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@@ -669,7 +669,7 @@ public class CoGroupedStreams<T1, T2> {
 
                @SuppressWarnings("WeakerAccess")
                public UnionSerializerSnapshot() {
-                       super(correspondingSerializerClass());
+                       super(UnionSerializer.class);
                }
 
                UnionSerializerSnapshot(UnionSerializer<T1, T2> 
serializerInstance) {
@@ -691,11 +691,6 @@ public class CoGroupedStreams<T1, T2> {
                protected UnionSerializer<T1, T2> 
createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] 
nestedSerializers) {
                        return new UnionSerializer<>((TypeSerializer<T1>) 
nestedSerializers[0], (TypeSerializer<T2>) nestedSerializers[1]);
                }
-
-               @SuppressWarnings("unchecked")
-               private static <T1, T2> Class<UnionSerializer<T1, T2>> 
correspondingSerializerClass() {
-                       return (Class<UnionSerializer<T1, T2>>) (Class<?>) 
UnionSerializer.class;
-               }
        }
 
        // 
------------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
index ce3bd0f..588592e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
@@ -844,7 +844,7 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, 
CONTEXT>
 
                @SuppressWarnings("WeakerAccess")
                public StateSerializerSnapshot() {
-                       super(correspondingSerializerClass());
+                       super(StateSerializer.class);
                }
 
                StateSerializerSnapshot(StateSerializer<TXN, CONTEXT> 
serializerInstance) {
@@ -871,10 +871,5 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, 
CONTEXT>
                protected TypeSerializer<?>[] 
getNestedSerializers(StateSerializer<TXN, CONTEXT> outerSerializer) {
                        return new TypeSerializer<?>[] { 
outerSerializer.transactionSerializer, outerSerializer.contextSerializer };
                }
-
-               @SuppressWarnings("unchecked")
-               private static <TXN, CONTEXT> Class<StateSerializer<TXN, 
CONTEXT>> correspondingSerializerClass() {
-                       return (Class<StateSerializer<TXN, CONTEXT>>) 
(Class<?>) StateSerializer.class;
-               }
        }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializerSnapshot.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializerSnapshot.java
index c28b045..7da3b3f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializerSnapshot.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializerSnapshot.java
@@ -31,7 +31,7 @@ public class TimerSerializerSnapshot<K, N> extends 
CompositeTypeSerializerSnapsh
        private static final int VERSION = 2;
 
        public TimerSerializerSnapshot() {
-               super(correspondingSerializerClass());
+               super(TimerSerializer.class);
        }
 
        public TimerSerializerSnapshot(TimerSerializer<K, N> timerSerializer) {
@@ -51,16 +51,11 @@ public class TimerSerializerSnapshot<K, N> extends 
CompositeTypeSerializerSnapsh
                @SuppressWarnings("unchecked")
                final TypeSerializer<N> namespaceSerializer = 
(TypeSerializer<N>) nestedSerializers[1];
 
-               return new TimerSerializer<K, N>(keySerializer, 
namespaceSerializer);
+               return new TimerSerializer<>(keySerializer, 
namespaceSerializer);
        }
 
        @Override
        protected TypeSerializer<?>[] getNestedSerializers(TimerSerializer<K, 
N> outerSerializer) {
                return new TypeSerializer<?>[] { 
outerSerializer.getKeySerializer(), outerSerializer.getNamespaceSerializer() };
        }
-
-       @SuppressWarnings("unchecked")
-       private static <K, N> Class<TimerSerializer<K, N>> 
correspondingSerializerClass() {
-               return (Class<TimerSerializer<K, N>>) (Class<?>) 
TimerSerializer.class;
-       }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
index 84629e4..3c99022 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
@@ -509,7 +509,7 @@ public class IntervalJoinOperator<K, T1, T2, OUT>
 
                @SuppressWarnings({"unused", "WeakerAccess"})
                public BufferEntrySerializerSnapshot() {
-                       super(correspondingSerializerClass());
+                       super(BufferEntrySerializer.class);
                }
 
                BufferEntrySerializerSnapshot(BufferEntrySerializer<T> 
serializerInstance) {
@@ -531,11 +531,6 @@ public class IntervalJoinOperator<K, T1, T2, OUT>
                protected BufferEntrySerializer<T> 
createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] 
nestedSerializers) {
                        return new BufferEntrySerializer<>((TypeSerializer<T>) 
nestedSerializers[0]);
                }
-
-               @SuppressWarnings("unchecked")
-               private static <T> Class<BufferEntrySerializer<T>> 
correspondingSerializerClass() {
-                       return (Class<BufferEntrySerializer<T>>) (Class<?>) 
BufferEntrySerializer.class;
-               }
        }
 
        @VisibleForTesting
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
index 10c2f68..3e7b1ec 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
@@ -314,7 +314,7 @@ public final class StreamElementSerializer<T> extends 
TypeSerializer<StreamEleme
 
                @SuppressWarnings("WeakerAccess")
                public StreamElementSerializerSnapshot() {
-                       super(serializerClass());
+                       super(StreamElementSerializer.class);
                }
 
                StreamElementSerializerSnapshot(StreamElementSerializer<T> 
serializerInstance) {
@@ -338,11 +338,5 @@ public final class StreamElementSerializer<T> extends 
TypeSerializer<StreamEleme
 
                        return new StreamElementSerializer<>(casted);
                }
-
-               @SuppressWarnings("unchecked")
-               private static <T> Class<StreamElementSerializer<T>> 
serializerClass() {
-                       Class<?> streamElementSerializerClass = 
StreamElementSerializer.class;
-                       return (Class<StreamElementSerializer<T>>) 
streamElementSerializerClass;
-               }
        }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java
index 90468ac..a130271 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java
@@ -30,7 +30,7 @@ import java.util.List;
  *
  * @param <T> the type of the list elements.
  */
-public final class ListViewSerializerSnapshot<T> extends 
CompositeTypeSerializerSnapshot<ListView<T>, ListViewSerializer> {
+public final class ListViewSerializerSnapshot<T> extends 
CompositeTypeSerializerSnapshot<ListView<T>, ListViewSerializer<T>> {
 
        private static final int CURRENT_VERSION = 1;
 
@@ -54,14 +54,14 @@ public final class ListViewSerializerSnapshot<T> extends 
CompositeTypeSerializer
        }
 
        @Override
-       protected ListViewSerializer 
createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] 
nestedSerializers) {
+       protected ListViewSerializer<T> 
createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] 
nestedSerializers) {
                @SuppressWarnings("unchecked")
                TypeSerializer<List<T>> listSerializer = 
(TypeSerializer<List<T>>) nestedSerializers[0];
                return new ListViewSerializer<>(listSerializer);
        }
 
        @Override
-       protected TypeSerializer<?>[] getNestedSerializers(ListViewSerializer 
outerSerializer) {
+       protected TypeSerializer<?>[] 
getNestedSerializers(ListViewSerializer<T> outerSerializer) {
                return new TypeSerializer<?>[] { 
outerSerializer.getListSerializer() };
        }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java
index 132f42f..618cf5c 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java
@@ -31,7 +31,7 @@ import java.util.Map;
  * @param <K> the key type of the map entries.
  * @param <V> the value type of the map entries.
  */
-public class MapViewSerializerSnapshot<K, V> extends 
CompositeTypeSerializerSnapshot<MapView<K, V>, MapViewSerializer> {
+public class MapViewSerializerSnapshot<K, V> extends 
CompositeTypeSerializerSnapshot<MapView<K, V>, MapViewSerializer<K, V>> {
 
        private static final int CURRENT_VERSION = 1;
 
@@ -55,14 +55,14 @@ public class MapViewSerializerSnapshot<K, V> extends 
CompositeTypeSerializerSnap
        }
 
        @Override
-       protected MapViewSerializer 
createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] 
nestedSerializers) {
+       protected MapViewSerializer<K, V> 
createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] 
nestedSerializers) {
                @SuppressWarnings("unchecked")
                TypeSerializer<Map<K, V>> mapSerializer = 
(TypeSerializer<Map<K, V>>) nestedSerializers[0];
                return new MapViewSerializer<>(mapSerializer);
        }
 
        @Override
-       protected TypeSerializer<?>[] getNestedSerializers(MapViewSerializer 
outerSerializer) {
+       protected TypeSerializer<?>[] getNestedSerializers(MapViewSerializer<K, 
V> outerSerializer) {
                return new TypeSerializer<?>[] { 
outerSerializer.getMapSerializer() };
        }
 }

Reply via email to