This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 02d877aeb93fa227df7d95922e6fa4c84bdd11e2 Author: Tzu-Li (Gordon) Tai <[email protected]> AuthorDate: Mon Feb 25 12:47:03 2019 +0800 [FLINK-11741] [core] Remove Scala EitherSerializer's ensureCompatibility method using LegacySerializerSnapshotTransformer interface The Scala EitherSerializer requires using the LegacySerializerSnapshotTransformer in order to remove the ensureCompatibility method because in 1.6, the serializer was returning Java's EitherSerializerConfigSnapshot, which has different generic types than the expected ScalaEitherSerializerSnapshot. --- .../runtime/EitherSerializerConfigSnapshot.java | 1 - .../api/scala/typeutils/EitherSerializer.scala | 57 +++++++--------------- 2 files changed, 18 insertions(+), 40 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java index a4863b0..c500335 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java @@ -63,7 +63,6 @@ public final class EitherSerializerConfigSnapshot<L, R> extends CompositeTypeSer nestedSerializersAndConfigs.get(1).f1); } else { - // Scala Either Serializer, or other. // fall back to the backwards compatibility path return super.resolveSchemaCompatibility(newSerializer); } diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala index 6e7d65e..87896f9 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala @@ -30,7 +30,8 @@ import org.apache.flink.core.memory.{DataInputView, DataOutputView} class EitherSerializer[A, B]( val leftSerializer: TypeSerializer[A], val rightSerializer: TypeSerializer[B]) - extends TypeSerializer[Either[A, B]] { + extends TypeSerializer[Either[A, B]] + with LegacySerializerSnapshotTransformer[Either[A, B]] { override def duplicate: EitherSerializer[A,B] = { val leftDup = leftSerializer.duplicate() @@ -123,44 +124,22 @@ class EitherSerializer[A, B]( new ScalaEitherSerializerSnapshot[A, B](this) } - override def ensureCompatibility( - configSnapshot: TypeSerializerConfigSnapshot[_]): CompatibilityResult[Either[A, B]] = { - - configSnapshot match { - // backwards compatibility path; - // Flink versions older or equal to 1.5.x uses a - // EitherSerializerConfigSnapshot as the snapshot - case legacyConfig: EitherSerializerConfigSnapshot[A, B] => - checkCompatibility(legacyConfig) - - case _ => CompatibilityResult.requiresMigration() - } - } - - private def checkCompatibility( - configSnapshot: CompositeTypeSerializerConfigSnapshot[_] - ): CompatibilityResult[Either[A, B]] = { - - val previousLeftRightSerWithConfigs = - configSnapshot.getNestedSerializersAndConfigs - - val leftCompatResult = CompatibilityUtil.resolveCompatibilityResult( - previousLeftRightSerWithConfigs.get(0).f0, - classOf[UnloadableDummyTypeSerializer[_]], - previousLeftRightSerWithConfigs.get(0).f1, - leftSerializer) - - val rightCompatResult = CompatibilityUtil.resolveCompatibilityResult( - previousLeftRightSerWithConfigs.get(1).f0, - classOf[UnloadableDummyTypeSerializer[_]], - previousLeftRightSerWithConfigs.get(1).f1, - rightSerializer) - - if (leftCompatResult.isRequiresMigration - || rightCompatResult.isRequiresMigration) { - CompatibilityResult.requiresMigration() - } else { - CompatibilityResult.compatible() + override def transformLegacySerializerSnapshot[U]( + legacySnapshot: TypeSerializerSnapshot[U] + ): TypeSerializerSnapshot[Either[A, B]] = { + + legacySnapshot match { + case correctSnapshot: ScalaEitherSerializerSnapshot[A, B] => + correctSnapshot + + case legacySnapshot: EitherSerializerConfigSnapshot[A, B] => + val transformedSnapshot = new ScalaEitherSerializerSnapshot[A, B]() + CompositeTypeSerializerUtil.setNestedSerializersSnapshots( + transformedSnapshot, + legacySnapshot.getNestedSerializersAndConfigs.get(0).f1, + legacySnapshot.getNestedSerializersAndConfigs.get(1).f1 + ) + transformedSnapshot } } }
