This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5dcc28a54d5d8db8f7423a682a3d2d1faddb6671
Author: Tzu-Li (Gordon) Tai <[email protected]>
AuthorDate: Mon Feb 25 12:47:03 2019 +0800

    [FLINK-11741] [core] Remove Scala EitherSerializer's ensureCompatibility 
method using LegacySerializerSnapshotTransformer interface
    
    The Scala EitherSerializer requires using the
    LegacySerializerSnapshotTransformer in order to remove the
    ensureCompatibility method because in 1.6, the serializer was returning
    Java's EitherSerializerConfigSnapshot, which has different generic types
    than the expected ScalaEitherSerializerSnapshot.
---
 .../runtime/EitherSerializerConfigSnapshot.java    |  1 -
 .../api/scala/typeutils/EitherSerializer.scala     | 57 +++++++---------------
 2 files changed, 18 insertions(+), 40 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java
index a4863b0..c500335 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java
@@ -63,7 +63,6 @@ public final class EitherSerializerConfigSnapshot<L, R> 
extends CompositeTypeSer
                                nestedSerializersAndConfigs.get(1).f1);
                }
                else {
-                       // Scala Either Serializer, or other.
                        // fall back to the backwards compatibility path
                        return super.resolveSchemaCompatibility(newSerializer);
                }
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
index 6e7d65e..87896f9 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
@@ -30,7 +30,8 @@ import org.apache.flink.core.memory.{DataInputView, 
DataOutputView}
 class EitherSerializer[A, B](
     val leftSerializer: TypeSerializer[A],
     val rightSerializer: TypeSerializer[B])
-  extends TypeSerializer[Either[A, B]] {
+  extends TypeSerializer[Either[A, B]]
+  with LegacySerializerSnapshotTransformer[Either[A, B]] {
 
   override def duplicate: EitherSerializer[A,B] = {
     val leftDup = leftSerializer.duplicate()
@@ -123,44 +124,22 @@ class EitherSerializer[A, B](
     new ScalaEitherSerializerSnapshot[A, B](this)
   }
 
-  override def ensureCompatibility(
-      configSnapshot: TypeSerializerConfigSnapshot[_]): 
CompatibilityResult[Either[A, B]] = {
-
-    configSnapshot match {
-      // backwards compatibility path;
-      // Flink versions older or equal to 1.5.x uses a
-      // EitherSerializerConfigSnapshot as the snapshot
-      case legacyConfig: EitherSerializerConfigSnapshot[A, B] =>
-        checkCompatibility(legacyConfig)
-
-      case _ => CompatibilityResult.requiresMigration()
-    }
-  }
-
-  private def checkCompatibility(
-      configSnapshot: CompositeTypeSerializerConfigSnapshot[_]
-    ): CompatibilityResult[Either[A, B]] = {
-
-    val previousLeftRightSerWithConfigs =
-      configSnapshot.getNestedSerializersAndConfigs
-
-    val leftCompatResult = CompatibilityUtil.resolveCompatibilityResult(
-      previousLeftRightSerWithConfigs.get(0).f0,
-      classOf[UnloadableDummyTypeSerializer[_]],
-      previousLeftRightSerWithConfigs.get(0).f1,
-      leftSerializer)
-
-    val rightCompatResult = CompatibilityUtil.resolveCompatibilityResult(
-      previousLeftRightSerWithConfigs.get(1).f0,
-      classOf[UnloadableDummyTypeSerializer[_]],
-      previousLeftRightSerWithConfigs.get(1).f1,
-      rightSerializer)
-
-    if (leftCompatResult.isRequiresMigration
-      || rightCompatResult.isRequiresMigration) {
-      CompatibilityResult.requiresMigration()
-    } else {
-      CompatibilityResult.compatible()
+  override def transformLegacySerializerSnapshot[U](
+      legacySnapshot: TypeSerializerSnapshot[U]
+  ): TypeSerializerSnapshot[Either[A, B]] = {
+
+    legacySnapshot match {
+      case correctSnapshot: ScalaEitherSerializerSnapshot[A, B] =>
+        correctSnapshot
+
+      case legacySnapshot: EitherSerializerConfigSnapshot[A, B] =>
+        val transformedSnapshot = new ScalaEitherSerializerSnapshot[A, B]()
+        CompositeTypeSerializerUtil.setNestedSerializersSnapshots(
+          transformedSnapshot,
+          legacySnapshot.getNestedSerializersAndConfigs.get(0).f1,
+          legacySnapshot.getNestedSerializersAndConfigs.get(1).f1
+        )
+        transformedSnapshot
     }
   }
 }

Reply via email to