This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit efd9cf3233ee33442f37786d934f204209a2fc9a
Author: Tzu-Li (Gordon) Tai <[email protected]>
AuthorDate: Mon Feb 25 13:20:41 2019 +0800

    [FLINK-11741] [table] Remove table dataview serializers' 
ensureCompatibility method using LegacySerializerSnapshotTransformer interface
    
    For the Table dataview serializers (i.e. ListViewSerializer and
    MapViewSerializer) we need to use the
    LegacySerializerSnapshotTransformer interface because these serializers
    were incorrectly returning the snapshot of their nested serializer in
    1.6, likewise to what the LockableTypeSerializer was incorrectly doing
    also in 1.6.
---
 .../flink/table/dataview/ListViewSerializer.scala  | 60 ++++++++++--------
 .../flink/table/dataview/MapViewSerializer.scala   | 72 ++++++++++++----------
 2 files changed, 71 insertions(+), 61 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
index 30257e5..75f9326 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.dataview
 
 import org.apache.flink.api.common.typeutils._
-import 
org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, 
ListSerializer}
+import 
org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, 
ListSerializerSnapshot}
 import org.apache.flink.core.memory.{DataInputView, DataOutputView}
 import org.apache.flink.table.api.dataview.ListView
 
@@ -35,7 +35,8 @@ import org.apache.flink.table.api.dataview.ListView
   */
 @SerialVersionUID(-2030398712359267867L)
 class ListViewSerializer[T](val listSerializer: 
TypeSerializer[java.util.List[T]])
-  extends TypeSerializer[ListView[T]] {
+  extends TypeSerializer[ListView[T]]
+  with LegacySerializerSnapshotTransformer[ListView[T]] {
 
   override def isImmutableType: Boolean = false
 
@@ -77,33 +78,38 @@ class ListViewSerializer[T](val listSerializer: 
TypeSerializer[java.util.List[T]
   override def snapshotConfiguration(): ListViewSerializerSnapshot[T] =
     new ListViewSerializerSnapshot[T](this)
 
-  override def ensureCompatibility(
-      configSnapshot: TypeSerializerConfigSnapshot[_]): 
CompatibilityResult[ListView[T]] = {
+  /**
+    * We need to override this as a [[LegacySerializerSnapshotTransformer]]
+    * because in Flink 1.6.x and below, this serializer was incorrectly 
returning
+    * directly the snapshot of the nested list serializer as its own snapshot.
+    *
+    * <p>This method transforms the incorrect list serializer snapshot
+    * to be a proper [[ListViewSerializerSnapshot]].
+    */
+  override def transformLegacySerializerSnapshot[U](
+      legacySnapshot: TypeSerializerSnapshot[U]
+  ): TypeSerializerSnapshot[ListView[T]] = {
+
+    legacySnapshot match {
+      case correctSnapshot: ListViewSerializerSnapshot[T] =>
+        correctSnapshot
 
-    configSnapshot match {
-      // backwards compatibility path;
-      // Flink versions older or equal to 1.6.x returns a
-      // CollectionSerializerConfigSnapshot as the snapshot
       case legacySnapshot: 
CollectionSerializerConfigSnapshot[java.util.List[T], T] =>
-        val previousListSerializerAndConfig =
-          legacySnapshot.getSingleNestedSerializerAndConfig
-
-        // in older versions, the nested list serializer was always
-        // specifically a ListSerializer, so this cast is safe
-        val castedSer = listSerializer.asInstanceOf[ListSerializer[T]]
-        val compatResult = CompatibilityUtil.resolveCompatibilityResult(
-          previousListSerializerAndConfig.f0,
-          classOf[UnloadableDummyTypeSerializer[_]],
-          previousListSerializerAndConfig.f1,
-          castedSer.getElementSerializer)
-
-        if (!compatResult.isRequiresMigration) {
-          CompatibilityResult.compatible[ListView[T]]
-        } else {
-          CompatibilityResult.requiresMigration[ListView[T]]
-        }
-
-      case _ => CompatibilityResult.requiresMigration[ListView[T]]
+        // first, transform the incorrect list serializer's snapshot
+        // into a proper ListSerializerSnapshot
+        val transformedNestedListSerializerSnapshot = new 
ListSerializerSnapshot[T]
+        CompositeTypeSerializerUtil.setNestedSerializersSnapshots(
+          transformedNestedListSerializerSnapshot,
+          legacySnapshot.getSingleNestedSerializerAndConfig.f1)
+
+        // then, wrap the transformed ListSerializerSnapshot
+        // as a nested snapshot in the final resulting 
ListViewSerializerSnapshot
+        val transformedListViewSerializerSnapshot = new 
ListViewSerializerSnapshot[T]()
+        CompositeTypeSerializerUtil.setNestedSerializersSnapshots(
+          transformedListViewSerializerSnapshot,
+          transformedNestedListSerializerSnapshot)
+
+        transformedListViewSerializerSnapshot
     }
   }
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
index 9b7bc28..9947c35 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.dataview
 
 import org.apache.flink.api.common.typeutils._
-import org.apache.flink.api.common.typeutils.base.{MapSerializer, 
MapSerializerConfigSnapshot}
+import 
org.apache.flink.api.common.typeutils.base.{MapSerializerConfigSnapshot, 
MapSerializerSnapshot}
 import org.apache.flink.core.memory.{DataInputView, DataOutputView}
 import org.apache.flink.table.api.dataview.MapView
 
@@ -37,7 +37,8 @@ import org.apache.flink.table.api.dataview.MapView
   */
 @SerialVersionUID(-9007142882049098705L)
 class MapViewSerializer[K, V](val mapSerializer: 
TypeSerializer[java.util.Map[K, V]])
-  extends TypeSerializer[MapView[K, V]] {
+  extends TypeSerializer[MapView[K, V]]
+  with LegacySerializerSnapshotTransformer[MapView[K, V]] {
 
   override def isImmutableType: Boolean = false
 
@@ -78,40 +79,43 @@ class MapViewSerializer[K, V](val mapSerializer: 
TypeSerializer[java.util.Map[K,
   override def snapshotConfiguration(): MapViewSerializerSnapshot[K, V] =
     new MapViewSerializerSnapshot[K, V](this)
 
-  // copy and modified from MapSerializer.ensureCompatibility
-  override def ensureCompatibility(configSnapshot: 
TypeSerializerConfigSnapshot[_])
-  : CompatibilityResult[MapView[K, V]] = {
+  /**
+    * We need to override this as a [[LegacySerializerSnapshotTransformer]]
+    * because in Flink 1.6.x and below, this serializer was incorrectly 
returning
+    * directly the snapshot of the nested map serializer as its own snapshot.
+    *
+    * <p>This method transforms the incorrect map serializer snapshot
+    * to be a proper [[MapViewSerializerSnapshot]].
+    */
+  override def transformLegacySerializerSnapshot[U](
+      legacySnapshot: TypeSerializerSnapshot[U]
+  ): TypeSerializerSnapshot[MapView[K, V]] = {
+
+    legacySnapshot match {
+      case correctSnapshot: MapViewSerializerSnapshot[K, V] =>
+        correctSnapshot
 
-    configSnapshot match {
-      // backwards compatibility path;
-      // Flink versions older or equal to 1.5.x returns a
-      // MapSerializerConfigSnapshot as the snapshot
       case legacySnapshot: MapSerializerConfigSnapshot[K, V] =>
-        val previousKvSerializersAndConfigs =
-          legacySnapshot.getNestedSerializersAndConfigs
-
-        // in older versions, the nested map serializer was always
-        // specifically a MapSerializer, so this cast is safe
-        val castedSer = mapSerializer.asInstanceOf[MapSerializer[K, V]]
-        val keyCompatResult = CompatibilityUtil.resolveCompatibilityResult(
-          previousKvSerializersAndConfigs.get(0).f0,
-          classOf[UnloadableDummyTypeSerializer[_]],
-          previousKvSerializersAndConfigs.get(0).f1,
-          castedSer.getKeySerializer)
-
-        val valueCompatResult = CompatibilityUtil.resolveCompatibilityResult(
-          previousKvSerializersAndConfigs.get(1).f0,
-          classOf[UnloadableDummyTypeSerializer[_]],
-          previousKvSerializersAndConfigs.get(1).f1,
-          castedSer.getValueSerializer)
-
-        if (!keyCompatResult.isRequiresMigration && 
!valueCompatResult.isRequiresMigration) {
-          CompatibilityResult.compatible[MapView[K, V]]
-        } else {
-          CompatibilityResult.requiresMigration[MapView[K, V]]
-        }
-
-      case _ => CompatibilityResult.requiresMigration[MapView[K, V]]
+        // first, transform the incorrect map serializer's snapshot
+        // into a proper ListSerializerSnapshot
+        val transformedNestedMapSerializerSnapshot =
+          new MapSerializerSnapshot[K, V]
+        CompositeTypeSerializerUtil.setNestedSerializersSnapshots(
+          transformedNestedMapSerializerSnapshot,
+          legacySnapshot.getNestedSerializersAndConfigs.get(0).f1,
+          legacySnapshot.getNestedSerializersAndConfigs.get(1).f1
+        )
+
+        // then, wrap the transformed MapSerializerSnapshot
+        // as a nested snapshot in the final resulting 
MapViewSerializerSnapshot
+        val transformedMapViewSerializerSnapshot =
+          new MapViewSerializerSnapshot[K, V]()
+        CompositeTypeSerializerUtil.setNestedSerializersSnapshots(
+          transformedMapViewSerializerSnapshot,
+          transformedNestedMapSerializerSnapshot
+        )
+
+        transformedMapViewSerializerSnapshot
     }
   }
 

Reply via email to