This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git
commit efd9cf3233ee33442f37786d934f204209a2fc9a Author: Tzu-Li (Gordon) Tai <[email protected]> AuthorDate: Mon Feb 25 13:20:41 2019 +0800 [FLINK-11741] [table] Remove table dataview serializers' ensureCompatibility method using LegacySerializerSnapshotTransformer interface For the Table dataview serializers (i.e. ListViewSerializer and MapViewSerializer) we need to use the LegacySerializerSnapshotTransformer interface because these serializers were incorrectly returning the snapshot of their nested serializer in 1.6, likewise to what the LockableTypeSerializer was incorrectly doing also in 1.6. --- .../flink/table/dataview/ListViewSerializer.scala | 60 ++++++++++-------- .../flink/table/dataview/MapViewSerializer.scala | 72 ++++++++++++---------- 2 files changed, 71 insertions(+), 61 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala index 30257e5..75f9326 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.dataview import org.apache.flink.api.common.typeutils._ -import org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, ListSerializer} +import org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, ListSerializerSnapshot} import org.apache.flink.core.memory.{DataInputView, DataOutputView} import org.apache.flink.table.api.dataview.ListView @@ -35,7 +35,8 @@ import org.apache.flink.table.api.dataview.ListView */ @SerialVersionUID(-2030398712359267867L) class ListViewSerializer[T](val listSerializer: TypeSerializer[java.util.List[T]]) - extends TypeSerializer[ListView[T]] { + extends TypeSerializer[ListView[T]] + with LegacySerializerSnapshotTransformer[ListView[T]] { override def isImmutableType: Boolean = false @@ -77,33 +78,38 @@ class ListViewSerializer[T](val listSerializer: TypeSerializer[java.util.List[T] override def snapshotConfiguration(): ListViewSerializerSnapshot[T] = new ListViewSerializerSnapshot[T](this) - override def ensureCompatibility( - configSnapshot: TypeSerializerConfigSnapshot[_]): CompatibilityResult[ListView[T]] = { + /** + * We need to override this as a [[LegacySerializerSnapshotTransformer]] + * because in Flink 1.6.x and below, this serializer was incorrectly returning + * directly the snapshot of the nested list serializer as its own snapshot. + * + * <p>This method transforms the incorrect list serializer snapshot + * to be a proper [[ListViewSerializerSnapshot]]. + */ + override def transformLegacySerializerSnapshot[U]( + legacySnapshot: TypeSerializerSnapshot[U] + ): TypeSerializerSnapshot[ListView[T]] = { + + legacySnapshot match { + case correctSnapshot: ListViewSerializerSnapshot[T] => + correctSnapshot - configSnapshot match { - // backwards compatibility path; - // Flink versions older or equal to 1.6.x returns a - // CollectionSerializerConfigSnapshot as the snapshot case legacySnapshot: CollectionSerializerConfigSnapshot[java.util.List[T], T] => - val previousListSerializerAndConfig = - legacySnapshot.getSingleNestedSerializerAndConfig - - // in older versions, the nested list serializer was always - // specifically a ListSerializer, so this cast is safe - val castedSer = listSerializer.asInstanceOf[ListSerializer[T]] - val compatResult = CompatibilityUtil.resolveCompatibilityResult( - previousListSerializerAndConfig.f0, - classOf[UnloadableDummyTypeSerializer[_]], - previousListSerializerAndConfig.f1, - castedSer.getElementSerializer) - - if (!compatResult.isRequiresMigration) { - CompatibilityResult.compatible[ListView[T]] - } else { - CompatibilityResult.requiresMigration[ListView[T]] - } - - case _ => CompatibilityResult.requiresMigration[ListView[T]] + // first, transform the incorrect list serializer's snapshot + // into a proper ListSerializerSnapshot + val transformedNestedListSerializerSnapshot = new ListSerializerSnapshot[T] + CompositeTypeSerializerUtil.setNestedSerializersSnapshots( + transformedNestedListSerializerSnapshot, + legacySnapshot.getSingleNestedSerializerAndConfig.f1) + + // then, wrap the transformed ListSerializerSnapshot + // as a nested snapshot in the final resulting ListViewSerializerSnapshot + val transformedListViewSerializerSnapshot = new ListViewSerializerSnapshot[T]() + CompositeTypeSerializerUtil.setNestedSerializersSnapshots( + transformedListViewSerializerSnapshot, + transformedNestedListSerializerSnapshot) + + transformedListViewSerializerSnapshot } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala index 9b7bc28..9947c35 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.dataview import org.apache.flink.api.common.typeutils._ -import org.apache.flink.api.common.typeutils.base.{MapSerializer, MapSerializerConfigSnapshot} +import org.apache.flink.api.common.typeutils.base.{MapSerializerConfigSnapshot, MapSerializerSnapshot} import org.apache.flink.core.memory.{DataInputView, DataOutputView} import org.apache.flink.table.api.dataview.MapView @@ -37,7 +37,8 @@ import org.apache.flink.table.api.dataview.MapView */ @SerialVersionUID(-9007142882049098705L) class MapViewSerializer[K, V](val mapSerializer: TypeSerializer[java.util.Map[K, V]]) - extends TypeSerializer[MapView[K, V]] { + extends TypeSerializer[MapView[K, V]] + with LegacySerializerSnapshotTransformer[MapView[K, V]] { override def isImmutableType: Boolean = false @@ -78,40 +79,43 @@ class MapViewSerializer[K, V](val mapSerializer: TypeSerializer[java.util.Map[K, override def snapshotConfiguration(): MapViewSerializerSnapshot[K, V] = new MapViewSerializerSnapshot[K, V](this) - // copy and modified from MapSerializer.ensureCompatibility - override def ensureCompatibility(configSnapshot: TypeSerializerConfigSnapshot[_]) - : CompatibilityResult[MapView[K, V]] = { + /** + * We need to override this as a [[LegacySerializerSnapshotTransformer]] + * because in Flink 1.6.x and below, this serializer was incorrectly returning + * directly the snapshot of the nested map serializer as its own snapshot. + * + * <p>This method transforms the incorrect map serializer snapshot + * to be a proper [[MapViewSerializerSnapshot]]. + */ + override def transformLegacySerializerSnapshot[U]( + legacySnapshot: TypeSerializerSnapshot[U] + ): TypeSerializerSnapshot[MapView[K, V]] = { + + legacySnapshot match { + case correctSnapshot: MapViewSerializerSnapshot[K, V] => + correctSnapshot - configSnapshot match { - // backwards compatibility path; - // Flink versions older or equal to 1.5.x returns a - // MapSerializerConfigSnapshot as the snapshot case legacySnapshot: MapSerializerConfigSnapshot[K, V] => - val previousKvSerializersAndConfigs = - legacySnapshot.getNestedSerializersAndConfigs - - // in older versions, the nested map serializer was always - // specifically a MapSerializer, so this cast is safe - val castedSer = mapSerializer.asInstanceOf[MapSerializer[K, V]] - val keyCompatResult = CompatibilityUtil.resolveCompatibilityResult( - previousKvSerializersAndConfigs.get(0).f0, - classOf[UnloadableDummyTypeSerializer[_]], - previousKvSerializersAndConfigs.get(0).f1, - castedSer.getKeySerializer) - - val valueCompatResult = CompatibilityUtil.resolveCompatibilityResult( - previousKvSerializersAndConfigs.get(1).f0, - classOf[UnloadableDummyTypeSerializer[_]], - previousKvSerializersAndConfigs.get(1).f1, - castedSer.getValueSerializer) - - if (!keyCompatResult.isRequiresMigration && !valueCompatResult.isRequiresMigration) { - CompatibilityResult.compatible[MapView[K, V]] - } else { - CompatibilityResult.requiresMigration[MapView[K, V]] - } - - case _ => CompatibilityResult.requiresMigration[MapView[K, V]] + // first, transform the incorrect map serializer's snapshot + // into a proper ListSerializerSnapshot + val transformedNestedMapSerializerSnapshot = + new MapSerializerSnapshot[K, V] + CompositeTypeSerializerUtil.setNestedSerializersSnapshots( + transformedNestedMapSerializerSnapshot, + legacySnapshot.getNestedSerializersAndConfigs.get(0).f1, + legacySnapshot.getNestedSerializersAndConfigs.get(1).f1 + ) + + // then, wrap the transformed MapSerializerSnapshot + // as a nested snapshot in the final resulting MapViewSerializerSnapshot + val transformedMapViewSerializerSnapshot = + new MapViewSerializerSnapshot[K, V]() + CompositeTypeSerializerUtil.setNestedSerializersSnapshots( + transformedMapViewSerializerSnapshot, + transformedNestedMapSerializerSnapshot + ) + + transformedMapViewSerializerSnapshot } }
