This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 02d877aeb93fa227df7d95922e6fa4c84bdd11e2
Author: Tzu-Li (Gordon) Tai <[email protected]>
AuthorDate: Mon Feb 25 12:47:03 2019 +0800

    [FLINK-11741] [core] Remove Scala EitherSerializer's ensureCompatibility 
method using LegacySerializerSnapshotTransformer interface
    
    The Scala EitherSerializer requires using the
    LegacySerializerSnapshotTransformer in order to remove the
    ensureCompatibility method because in 1.6, the serializer was returning
    Java's EitherSerializerConfigSnapshot, which has different generic types
    than the expected ScalaEitherSerializerSnapshot.
---
 .../runtime/EitherSerializerConfigSnapshot.java    |  1 -
 .../api/scala/typeutils/EitherSerializer.scala     | 57 +++++++---------------
 2 files changed, 18 insertions(+), 40 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java
index a4863b0..c500335 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java
@@ -63,7 +63,6 @@ public final class EitherSerializerConfigSnapshot<L, R> 
extends CompositeTypeSer
                                nestedSerializersAndConfigs.get(1).f1);
                }
                else {
-                       // Scala Either Serializer, or other.
                        // fall back to the backwards compatibility path
                        return super.resolveSchemaCompatibility(newSerializer);
                }
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
index 6e7d65e..87896f9 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
@@ -30,7 +30,8 @@ import org.apache.flink.core.memory.{DataInputView, 
DataOutputView}
 class EitherSerializer[A, B](
     val leftSerializer: TypeSerializer[A],
     val rightSerializer: TypeSerializer[B])
-  extends TypeSerializer[Either[A, B]] {
+  extends TypeSerializer[Either[A, B]]
+  with LegacySerializerSnapshotTransformer[Either[A, B]] {
 
   override def duplicate: EitherSerializer[A,B] = {
     val leftDup = leftSerializer.duplicate()
@@ -123,44 +124,22 @@ class EitherSerializer[A, B](
     new ScalaEitherSerializerSnapshot[A, B](this)
   }
 
-  override def ensureCompatibility(
-      configSnapshot: TypeSerializerConfigSnapshot[_]): 
CompatibilityResult[Either[A, B]] = {
-
-    configSnapshot match {
-      // backwards compatibility path;
-      // Flink versions older or equal to 1.5.x uses a
-      // EitherSerializerConfigSnapshot as the snapshot
-      case legacyConfig: EitherSerializerConfigSnapshot[A, B] =>
-        checkCompatibility(legacyConfig)
-
-      case _ => CompatibilityResult.requiresMigration()
-    }
-  }
-
-  private def checkCompatibility(
-      configSnapshot: CompositeTypeSerializerConfigSnapshot[_]
-    ): CompatibilityResult[Either[A, B]] = {
-
-    val previousLeftRightSerWithConfigs =
-      configSnapshot.getNestedSerializersAndConfigs
-
-    val leftCompatResult = CompatibilityUtil.resolveCompatibilityResult(
-      previousLeftRightSerWithConfigs.get(0).f0,
-      classOf[UnloadableDummyTypeSerializer[_]],
-      previousLeftRightSerWithConfigs.get(0).f1,
-      leftSerializer)
-
-    val rightCompatResult = CompatibilityUtil.resolveCompatibilityResult(
-      previousLeftRightSerWithConfigs.get(1).f0,
-      classOf[UnloadableDummyTypeSerializer[_]],
-      previousLeftRightSerWithConfigs.get(1).f1,
-      rightSerializer)
-
-    if (leftCompatResult.isRequiresMigration
-      || rightCompatResult.isRequiresMigration) {
-      CompatibilityResult.requiresMigration()
-    } else {
-      CompatibilityResult.compatible()
+  override def transformLegacySerializerSnapshot[U](
+      legacySnapshot: TypeSerializerSnapshot[U]
+  ): TypeSerializerSnapshot[Either[A, B]] = {
+
+    legacySnapshot match {
+      case correctSnapshot: ScalaEitherSerializerSnapshot[A, B] =>
+        correctSnapshot
+
+      case legacySnapshot: EitherSerializerConfigSnapshot[A, B] =>
+        val transformedSnapshot = new ScalaEitherSerializerSnapshot[A, B]()
+        CompositeTypeSerializerUtil.setNestedSerializersSnapshots(
+          transformedSnapshot,
+          legacySnapshot.getNestedSerializersAndConfigs.get(0).f1,
+          legacySnapshot.getNestedSerializersAndConfigs.get(1).f1
+        )
+        transformedSnapshot
     }
   }
 }

Reply via email to