This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5dcc28a54d5d8db8f7423a682a3d2d1faddb6671 Author: Tzu-Li (Gordon) Tai <[email protected]> AuthorDate: Mon Feb 25 12:47:03 2019 +0800 [FLINK-11741] [core] Remove Scala EitherSerializer's ensureCompatibility method using LegacySerializerSnapshotTransformer interface The Scala EitherSerializer requires using the LegacySerializerSnapshotTransformer in order to remove the ensureCompatibility method because in 1.6, the serializer was returning Java's EitherSerializerConfigSnapshot, which has different generic types than the expected ScalaEitherSerializerSnapshot. --- .../runtime/EitherSerializerConfigSnapshot.java | 1 - .../api/scala/typeutils/EitherSerializer.scala | 57 +++++++--------------- 2 files changed, 18 insertions(+), 40 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java index a4863b0..c500335 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java @@ -63,7 +63,6 @@ public final class EitherSerializerConfigSnapshot<L, R> extends CompositeTypeSer nestedSerializersAndConfigs.get(1).f1); } else { - // Scala Either Serializer, or other. // fall back to the backwards compatibility path return super.resolveSchemaCompatibility(newSerializer); } diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala index 6e7d65e..87896f9 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala @@ -30,7 +30,8 @@ import org.apache.flink.core.memory.{DataInputView, DataOutputView} class EitherSerializer[A, B]( val leftSerializer: TypeSerializer[A], val rightSerializer: TypeSerializer[B]) - extends TypeSerializer[Either[A, B]] { + extends TypeSerializer[Either[A, B]] + with LegacySerializerSnapshotTransformer[Either[A, B]] { override def duplicate: EitherSerializer[A,B] = { val leftDup = leftSerializer.duplicate() @@ -123,44 +124,22 @@ class EitherSerializer[A, B]( new ScalaEitherSerializerSnapshot[A, B](this) } - override def ensureCompatibility( - configSnapshot: TypeSerializerConfigSnapshot[_]): CompatibilityResult[Either[A, B]] = { - - configSnapshot match { - // backwards compatibility path; - // Flink versions older or equal to 1.5.x uses a - // EitherSerializerConfigSnapshot as the snapshot - case legacyConfig: EitherSerializerConfigSnapshot[A, B] => - checkCompatibility(legacyConfig) - - case _ => CompatibilityResult.requiresMigration() - } - } - - private def checkCompatibility( - configSnapshot: CompositeTypeSerializerConfigSnapshot[_] - ): CompatibilityResult[Either[A, B]] = { - - val previousLeftRightSerWithConfigs = - configSnapshot.getNestedSerializersAndConfigs - - val leftCompatResult = CompatibilityUtil.resolveCompatibilityResult( - previousLeftRightSerWithConfigs.get(0).f0, - classOf[UnloadableDummyTypeSerializer[_]], - previousLeftRightSerWithConfigs.get(0).f1, - leftSerializer) - - val rightCompatResult = CompatibilityUtil.resolveCompatibilityResult( - previousLeftRightSerWithConfigs.get(1).f0, - classOf[UnloadableDummyTypeSerializer[_]], - previousLeftRightSerWithConfigs.get(1).f1, - rightSerializer) - - if (leftCompatResult.isRequiresMigration - || rightCompatResult.isRequiresMigration) { - CompatibilityResult.requiresMigration() - } else { - CompatibilityResult.compatible() + override def transformLegacySerializerSnapshot[U]( + legacySnapshot: TypeSerializerSnapshot[U] + ): TypeSerializerSnapshot[Either[A, B]] = { + + legacySnapshot match { + case correctSnapshot: ScalaEitherSerializerSnapshot[A, B] => + correctSnapshot + + case legacySnapshot: EitherSerializerConfigSnapshot[A, B] => + val transformedSnapshot = new ScalaEitherSerializerSnapshot[A, B]() + CompositeTypeSerializerUtil.setNestedSerializersSnapshots( + transformedSnapshot, + legacySnapshot.getNestedSerializersAndConfigs.get(0).f1, + legacySnapshot.getNestedSerializersAndConfigs.get(1).f1 + ) + transformedSnapshot } } }
