This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new d1d2256 [FLINK-13266][table] Port DataView related classes to flink-table-common d1d2256 is described below commit d1d2256e22742511faabb6e75c80af75a0a8a58b Author: godfreyhe <godfre...@163.com> AuthorDate: Thu Jul 18 15:10:00 2019 +0800 [FLINK-13266][table] Port DataView related classes to flink-table-common --- .../apache/flink/table/api/dataview/DataView.java | 3 + .../apache/flink/table/api/dataview/ListView.java | 10 +- .../apache/flink/table/api/dataview/MapView.java | 10 +- .../flink/table/dataview}/ListViewSerializer.java | 60 ++++++- .../table/dataview/ListViewSerializerSnapshot.java | 2 + .../flink/table/dataview}/ListViewTypeInfo.java | 4 +- .../table/dataview}/ListViewTypeInfoFactory.java | 4 +- .../flink/table/dataview}/MapViewSerializer.java | 63 ++++++- .../table/dataview/MapViewSerializerSnapshot.java | 2 + .../flink/table/dataview}/MapViewTypeInfo.java | 4 +- .../table/dataview}/MapViewTypeInfoFactory.java | 4 +- .../table/dataview}/NullAwareMapSerializer.java | 4 +- .../flink/table/dataview}/NullSerializer.java | 4 +- .../FirstValueWithRetractAggFunction.java | 4 +- .../LastValueWithRetractAggFunction.java | 4 +- .../apache/flink/table/dataview/DataViewSpec.scala | 1 - .../flink/table/plan/util/AggregateUtil.scala | 9 +- .../apache/flink/table/api/dataview/DataView.scala | 35 ---- .../flink/table/api/dataview/DataViewSpec.scala | 5 +- .../apache/flink/table/api/dataview/ListView.scala | 142 --------------- .../apache/flink/table/api/dataview/MapView.scala | 198 --------------------- .../flink/table/dataview/ListViewSerializer.scala | 117 ------------ .../flink/table/dataview/ListViewTypeInfo.scala | 66 ------- .../table/dataview/ListViewTypeInfoFactory.scala | 43 ----- .../flink/table/dataview/MapViewSerializer.scala | 123 ------------- .../flink/table/dataview/MapViewTypeInfo.scala | 72 -------- .../table/dataview/MapViewTypeInfoFactory.scala | 51 ------ .../aggfunctions/CollectAggFunction.scala | 1 - .../functions/utils/UserDefinedFunctionUtils.scala | 30 ++-- .../table/dataview/ListViewSerializerTest.scala | 1 - .../table/dataview/MapViewSerializerTest.scala | 1 - .../table/dataview/PerKeyStateDataViewStore.java | 2 - .../dataview/PerWindowStateDataViewStore.java | 2 - .../flink/table/dataview/StateDataViewStore.java | 2 - .../table/types/TypeInfoDataTypeConverter.java | 2 +- 35 files changed, 171 insertions(+), 914 deletions(-) diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/dataview/DataView.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/DataView.java similarity index 95% rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/dataview/DataView.java rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/DataView.java index 487ef79..8fe6aaf 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/dataview/DataView.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/DataView.java @@ -18,6 +18,8 @@ package org.apache.flink.table.api.dataview; +import org.apache.flink.annotation.PublicEvolving; + import java.io.Serializable; /** @@ -27,6 +29,7 @@ import java.io.Serializable; * <p>Depending on the context in which the {@code AggregateFunction} is * used, a {@link DataView} can be backed by a Java heap collection or a state backend. */ +@PublicEvolving public interface DataView extends Serializable { /** diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/dataview/ListView.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/ListView.java similarity index 95% rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/dataview/ListView.java rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/ListView.java index 5f34e72..e78b09e 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/dataview/ListView.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/ListView.java @@ -18,9 +18,10 @@ package org.apache.flink.table.api.dataview; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.typeinfo.TypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.table.typeutils.ListViewTypeInfoFactory; +import org.apache.flink.table.dataview.ListViewTypeInfoFactory; import java.util.ArrayList; import java.util.List; @@ -75,6 +76,7 @@ import java.util.Objects; * }</pre> */ @TypeInfo(ListViewTypeInfoFactory.class) +@PublicEvolving public class ListView<T> implements DataView { private static final long serialVersionUID = 5502298766901215388L; @@ -106,7 +108,7 @@ public class ListView<T> implements DataView { * Returns an iterable of the list view. * * @throws Exception Thrown if the system cannot get data. - * @return The iterable of the list or { @code null} if the list is empty. + * @return The iterable of the list. */ public Iterable<T> get() throws Exception { return list; @@ -167,8 +169,4 @@ public class ListView<T> implements DataView { return Objects.hash(elementType, list); } - @Override - public String toString() { - return "ListView" + list.toString(); - } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/dataview/MapView.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/MapView.java similarity index 96% rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/dataview/MapView.java rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/MapView.java index aa476be..7feb07b 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/dataview/MapView.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/MapView.java @@ -18,9 +18,10 @@ package org.apache.flink.table.api.dataview; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.typeinfo.TypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.table.typeutils.MapViewTypeInfoFactory; +import org.apache.flink.table.dataview.MapViewTypeInfoFactory; import java.util.HashMap; import java.util.Iterator; @@ -76,6 +77,7 @@ import java.util.Map; * */ @TypeInfo(MapViewTypeInfoFactory.class) +@PublicEvolving public class MapView<K, V> implements DataView { private static final long serialVersionUID = -6185595470714822744L; @@ -108,7 +110,7 @@ public class MapView<K, V> implements DataView { } /** - * Return the value for the specified key or { @code null } if the key is not in the map view. + * Return the value for the specified key or {@code null} if the key is not in the map view. * * @param key The look up key. * @return The value for the specified key. @@ -223,8 +225,4 @@ public class MapView<K, V> implements DataView { return map.hashCode(); } - @Override - public String toString() { - return "MapView" + map.toString(); - } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/ListViewSerializer.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewSerializer.java similarity index 54% rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/ListViewSerializer.java rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewSerializer.java index 9eb8af2..b4a3445 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/ListViewSerializer.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewSerializer.java @@ -16,10 +16,15 @@ * limitations under the License. */ -package org.apache.flink.table.typeutils; +package org.apache.flink.table.dataview; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil; +import org.apache.flink.api.common.typeutils.LegacySerializerSnapshotTransformer; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.base.ListSerializerSnapshot; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.table.api.dataview.ListView; @@ -36,9 +41,12 @@ import java.util.List; * * @param <T> The type of element in the list. */ -public class ListViewSerializer<T> extends TypeSerializer<ListView<T>> { +@Internal +public class ListViewSerializer<T> + extends TypeSerializer<ListView<T>> + implements LegacySerializerSnapshotTransformer<ListView<T>> { - private static final long serialVersionUID = 3272986300876096397L; + private static final long serialVersionUID = -2030398712359267867L; private final TypeSerializer<List<T>> listSerializer; @@ -63,7 +71,7 @@ public class ListViewSerializer<T> extends TypeSerializer<ListView<T>> { @Override public ListView<T> copy(ListView<T> from) { - return new ListView<>(null, from.list); + return new ListView<>(null, listSerializer.copy(from.list)); } @Override @@ -113,6 +121,48 @@ public class ListViewSerializer<T> extends TypeSerializer<ListView<T>> { @Override public TypeSerializerSnapshot<ListView<T>> snapshotConfiguration() { - throw new UnsupportedOperationException(); + return new ListViewSerializerSnapshot<>(this); } + + /** + * We need to override this as a {@link LegacySerializerSnapshotTransformer} + * because in Flink 1.6.x and below, this serializer was incorrectly returning + * directly the snapshot of the nested list serializer as its own snapshot. + * + * <p>This method transforms the incorrect list serializer snapshot + * to be a proper {@link ListViewSerializerSnapshot}. + */ + @Override + public <U> TypeSerializerSnapshot<ListView<T>> transformLegacySerializerSnapshot( + TypeSerializerSnapshot<U> legacySnapshot) { + if (legacySnapshot instanceof ListViewSerializerSnapshot) { + return (TypeSerializerSnapshot<ListView<T>>) legacySnapshot; + } else if (legacySnapshot instanceof CollectionSerializerConfigSnapshot) { + // first, transform the incorrect list serializer's snapshot + // into a proper ListSerializerSnapshot + ListSerializerSnapshot<T> transformedNestedListSerializerSnapshot = new ListSerializerSnapshot<>(); + CollectionSerializerConfigSnapshot<List<T>, T> snapshot = + (CollectionSerializerConfigSnapshot<List<T>, T>) legacySnapshot; + CompositeTypeSerializerUtil.setNestedSerializersSnapshots( + transformedNestedListSerializerSnapshot, + (TypeSerializerSnapshot<?>) (snapshot.getSingleNestedSerializerAndConfig().f1)); + + // then, wrap the transformed ListSerializerSnapshot + // as a nested snapshot in the final resulting ListViewSerializerSnapshot + ListViewSerializerSnapshot<T> transformedListViewSerializerSnapshot = new ListViewSerializerSnapshot<>(); + CompositeTypeSerializerUtil.setNestedSerializersSnapshots( + transformedListViewSerializerSnapshot, + transformedNestedListSerializerSnapshot); + + return transformedListViewSerializerSnapshot; + } else { + throw new UnsupportedOperationException( + legacySnapshot.getClass().getCanonicalName() + " is not supported."); + } + } + + public TypeSerializer<List<T>> getListSerializer() { + return listSerializer; + } + } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java similarity index 97% rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java index a130271..72547fd 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java @@ -18,6 +18,7 @@ package org.apache.flink.table.dataview; +import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; @@ -30,6 +31,7 @@ import java.util.List; * * @param <T> the type of the list elements. */ +@Internal public final class ListViewSerializerSnapshot<T> extends CompositeTypeSerializerSnapshot<ListView<T>, ListViewSerializer<T>> { private static final int CURRENT_VERSION = 1; diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/ListViewTypeInfo.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewTypeInfo.java similarity index 97% rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/ListViewTypeInfo.java rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewTypeInfo.java index f0c71a4..951b140 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/ListViewTypeInfo.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewTypeInfo.java @@ -16,8 +16,9 @@ * limitations under the License. */ -package org.apache.flink.table.typeutils; +package org.apache.flink.table.dataview; +import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -29,6 +30,7 @@ import org.apache.flink.table.api.dataview.ListView; * * @param <T> element type */ +@Internal public class ListViewTypeInfo<T> extends TypeInformation<ListView<T>> { private static final long serialVersionUID = 6468505781419989441L; diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/ListViewTypeInfoFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewTypeInfoFactory.java similarity index 94% rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/ListViewTypeInfoFactory.java rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewTypeInfoFactory.java index 7dc9227..9aef459 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/ListViewTypeInfoFactory.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewTypeInfoFactory.java @@ -16,8 +16,9 @@ * limitations under the License. */ -package org.apache.flink.table.typeutils; +package org.apache.flink.table.dataview; +import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeinfo.TypeInfoFactory; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.GenericTypeInfo; @@ -29,6 +30,7 @@ import java.util.Map; /** * TypeInformation factory for {@link ListView}. */ +@Internal public class ListViewTypeInfoFactory<T> extends TypeInfoFactory<ListView<T>> { @Override diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/MapViewSerializer.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewSerializer.java similarity index 55% rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/MapViewSerializer.java rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewSerializer.java index 3f697d6..b34b698 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/MapViewSerializer.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewSerializer.java @@ -16,10 +16,15 @@ * limitations under the License. */ -package org.apache.flink.table.typeutils; +package org.apache.flink.table.dataview; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil; +import org.apache.flink.api.common.typeutils.LegacySerializerSnapshotTransformer; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.base.MapSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.base.MapSerializerSnapshot; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.table.api.dataview.MapView; @@ -38,8 +43,12 @@ import java.util.Map; * @param <K> The type of the keys in the map. * @param <V> The type of the values in the map. */ -public class MapViewSerializer<K, V> extends TypeSerializer<MapView<K, V>> { - private static final long serialVersionUID = 202624224378185203L; +@Internal +public class MapViewSerializer<K, V> + extends TypeSerializer<MapView<K, V>> + implements LegacySerializerSnapshotTransformer<MapView<K, V>> { + + private static final long serialVersionUID = -9007142882049098705L; private final TypeSerializer<Map<K, V>> mapSerializer; @@ -54,7 +63,7 @@ public class MapViewSerializer<K, V> extends TypeSerializer<MapView<K, V>> { @Override public TypeSerializer<MapView<K, V>> duplicate() { - return new MapViewSerializer<>(mapSerializer); + return new MapViewSerializer<>(mapSerializer.duplicate()); } @Override @@ -114,6 +123,50 @@ public class MapViewSerializer<K, V> extends TypeSerializer<MapView<K, V>> { @Override public TypeSerializerSnapshot<MapView<K, V>> snapshotConfiguration() { - throw new UnsupportedOperationException(); + return new MapViewSerializerSnapshot<>(this); + } + + /** + * We need to override this as a {@link LegacySerializerSnapshotTransformer} + * because in Flink 1.6.x and below, this serializer was incorrectly returning + * directly the snapshot of the nested map serializer as its own snapshot. + * + * <p>This method transforms the incorrect map serializer snapshot + * to be a proper {@link MapViewSerializerSnapshot}. + */ + @Override + public <U> TypeSerializerSnapshot<MapView<K, V>> transformLegacySerializerSnapshot( + TypeSerializerSnapshot<U> legacySnapshot) { + if (legacySnapshot instanceof MapViewSerializerSnapshot) { + return (TypeSerializerSnapshot<MapView<K, V>>) legacySnapshot; + } else if (legacySnapshot instanceof MapSerializerConfigSnapshot) { + // first, transform the incorrect map serializer's snapshot + // into a proper ListSerializerSnapshot + MapSerializerSnapshot<K, V> transformedNestedMapSerializerSnapshot = new MapSerializerSnapshot<>(); + MapSerializerConfigSnapshot<K, V> snapshot = (MapSerializerConfigSnapshot<K, V>) legacySnapshot; + CompositeTypeSerializerUtil.setNestedSerializersSnapshots( + transformedNestedMapSerializerSnapshot, + snapshot.getNestedSerializersAndConfigs().get(0).f1, + snapshot.getNestedSerializersAndConfigs().get(1).f1 + ); + + // then, wrap the transformed MapSerializerSnapshot + // as a nested snapshot in the final resulting MapViewSerializerSnapshot + MapViewSerializerSnapshot<K, V> transformedMapViewSerializerSnapshot = new MapViewSerializerSnapshot<>(); + CompositeTypeSerializerUtil.setNestedSerializersSnapshots( + transformedMapViewSerializerSnapshot, + transformedNestedMapSerializerSnapshot + ); + + return transformedMapViewSerializerSnapshot; + } else { + throw new UnsupportedOperationException( + legacySnapshot.getClass().getCanonicalName() + " is not supported."); + } + } + + public TypeSerializer<Map<K, V>> getMapSerializer() { + return mapSerializer; } + } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java similarity index 97% rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java index 618cf5c..8b58449 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java @@ -18,6 +18,7 @@ package org.apache.flink.table.dataview; +import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; @@ -31,6 +32,7 @@ import java.util.Map; * @param <K> the key type of the map entries. * @param <V> the value type of the map entries. */ +@Internal public class MapViewSerializerSnapshot<K, V> extends CompositeTypeSerializerSnapshot<MapView<K, V>, MapViewSerializer<K, V>> { private static final int CURRENT_VERSION = 1; diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/MapViewTypeInfo.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewTypeInfo.java similarity index 97% rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/MapViewTypeInfo.java rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewTypeInfo.java index b9ee0b2..af40f28 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/MapViewTypeInfo.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewTypeInfo.java @@ -16,8 +16,9 @@ * limitations under the License. */ -package org.apache.flink.table.typeutils; +package org.apache.flink.table.dataview; +import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -32,6 +33,7 @@ import java.util.Objects; * @param <K> key type * @param <V> value type */ +@Internal public class MapViewTypeInfo<K, V> extends TypeInformation<MapView<K, V>> { private static final long serialVersionUID = -2883944144965318259L; diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/MapViewTypeInfoFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewTypeInfoFactory.java similarity index 95% rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/MapViewTypeInfoFactory.java rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewTypeInfoFactory.java index fa8abdb..294806d 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/MapViewTypeInfoFactory.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewTypeInfoFactory.java @@ -16,8 +16,9 @@ * limitations under the License. */ -package org.apache.flink.table.typeutils; +package org.apache.flink.table.dataview; +import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeinfo.TypeInfoFactory; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.GenericTypeInfo; @@ -29,6 +30,7 @@ import java.util.Map; /** * TypeInformation factory for {@link MapView}. */ +@Internal public class MapViewTypeInfoFactory<K, V> extends TypeInfoFactory<MapView<K, V>> { @Override diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/NullAwareMapSerializer.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/NullAwareMapSerializer.java similarity index 98% rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/NullAwareMapSerializer.java rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/NullAwareMapSerializer.java index be08836..dc0b691 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/NullAwareMapSerializer.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/NullAwareMapSerializer.java @@ -16,8 +16,9 @@ * limitations under the License. */ -package org.apache.flink.table.typeutils; +package org.apache.flink.table.dataview; +import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.core.memory.DataInputView; @@ -31,6 +32,7 @@ import java.util.Map; * The {@link NullAwareMapSerializer} is similar to MapSerializer, the only difference is that * the {@link NullAwareMapSerializer} can handle null keys. */ +@Internal public class NullAwareMapSerializer<K, V> extends TypeSerializer<Map<K, V>> { private static final long serialVersionUID = 5363147328373166590L; diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/NullSerializer.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/NullSerializer.java similarity index 95% rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/NullSerializer.java rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/NullSerializer.java index 9d20f8e..8964d95 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/NullSerializer.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/NullSerializer.java @@ -16,8 +16,9 @@ * limitations under the License. */ -package org.apache.flink.table.typeutils; +package org.apache.flink.table.dataview; +import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton; import org.apache.flink.core.memory.DataInputView; @@ -28,6 +29,7 @@ import java.io.IOException; /** * A serializer for null. */ +@Internal public class NullSerializer extends TypeSerializerSingleton<Object> { private static final long serialVersionUID = -5381596724707742625L; diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/FirstValueWithRetractAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/FirstValueWithRetractAggFunction.java index 344e797..78cb668 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/FirstValueWithRetractAggFunction.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/FirstValueWithRetractAggFunction.java @@ -36,6 +36,8 @@ import org.apache.flink.table.dataformat.BinaryGeneric; import org.apache.flink.table.dataformat.BinaryString; import org.apache.flink.table.dataformat.Decimal; import org.apache.flink.table.dataformat.GenericRow; +import org.apache.flink.table.dataview.MapViewSerializer; +import org.apache.flink.table.dataview.MapViewTypeInfo; import org.apache.flink.table.functions.AggregateFunction; import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.LogicalType; @@ -45,8 +47,6 @@ import org.apache.flink.table.typeutils.BinaryStringSerializer; import org.apache.flink.table.typeutils.BinaryStringTypeInfo; import org.apache.flink.table.typeutils.DecimalSerializer; import org.apache.flink.table.typeutils.DecimalTypeInfo; -import org.apache.flink.table.typeutils.MapViewSerializer; -import org.apache.flink.table.typeutils.MapViewTypeInfo; import java.util.ArrayList; import java.util.Iterator; diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/LastValueWithRetractAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/LastValueWithRetractAggFunction.java index d07acfb..f6df480 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/LastValueWithRetractAggFunction.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/LastValueWithRetractAggFunction.java @@ -36,6 +36,8 @@ import org.apache.flink.table.dataformat.BinaryGeneric; import org.apache.flink.table.dataformat.BinaryString; import org.apache.flink.table.dataformat.Decimal; import org.apache.flink.table.dataformat.GenericRow; +import org.apache.flink.table.dataview.MapViewSerializer; +import org.apache.flink.table.dataview.MapViewTypeInfo; import org.apache.flink.table.functions.AggregateFunction; import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.LogicalType; @@ -45,8 +47,6 @@ import org.apache.flink.table.typeutils.BinaryStringSerializer; import org.apache.flink.table.typeutils.BinaryStringTypeInfo; import org.apache.flink.table.typeutils.DecimalSerializer; import org.apache.flink.table.typeutils.DecimalTypeInfo; -import org.apache.flink.table.typeutils.MapViewSerializer; -import org.apache.flink.table.typeutils.MapViewTypeInfo; import java.util.ArrayList; import java.util.Iterator; diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/dataview/DataViewSpec.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/dataview/DataViewSpec.scala index 9698758..7872664 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/dataview/DataViewSpec.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/dataview/DataViewSpec.scala @@ -19,7 +19,6 @@ package org.apache.flink.table.dataview import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.table.typeutils.{ListViewTypeInfo, MapViewTypeInfo} /** * Data view specification. diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala index 31784aa..933ec38 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala @@ -19,12 +19,12 @@ package org.apache.flink.table.plan.util import org.apache.flink.api.common.typeinfo.Types import org.apache.flink.table.JLong -import org.apache.flink.table.api.{DataTypes, TableConfig, ExecutionConfigOptions, TableException} +import org.apache.flink.table.api.{DataTypes, ExecutionConfigOptions, TableConfig, TableException} import org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty import org.apache.flink.table.calcite.{FlinkTypeFactory, FlinkTypeSystem} import org.apache.flink.table.dataformat.BaseRow import org.apache.flink.table.dataview.DataViewUtils.useNullSerializerForStateViewFieldsFromAccType -import org.apache.flink.table.dataview.{DataViewSpec, MapViewSpec} +import org.apache.flink.table.dataview.{DataViewSpec, MapViewSpec, MapViewTypeInfo} import org.apache.flink.table.expressions.ExpressionUtils.extractValue import org.apache.flink.table.expressions._ import org.apache.flink.table.functions.aggfunctions.DeclarativeAggregateFunction @@ -42,8 +42,7 @@ import org.apache.flink.table.types.logical.utils.LogicalTypeChecks import org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot import org.apache.flink.table.types.logical.{LogicalTypeRoot, _} import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType -import org.apache.flink.table.typeutils.{BinaryStringTypeInfo, MapViewTypeInfo} - +import org.apache.flink.table.typeutils.BinaryStringTypeInfo import org.apache.calcite.rel.`type`._ import org.apache.calcite.rel.core.{Aggregate, AggregateCall} import org.apache.calcite.rex.RexInputRef @@ -51,8 +50,6 @@ import org.apache.calcite.sql.fun._ import org.apache.calcite.sql.validate.SqlMonotonicity import org.apache.calcite.sql.{SqlKind, SqlRankFunction} import org.apache.calcite.tools.RelBuilder - -import java.lang.{Long => JLong} import java.time.Duration import java.util diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/DataView.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/DataView.scala deleted file mode 100644 index 2214086..0000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/DataView.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.api.dataview - -/** - * A [[DataView]] is a collection type that can be used in the accumulator of an - * [[org.apache.flink.table.functions.AggregateFunction]]. - * - * Depending on the context in which the [[org.apache.flink.table.functions.AggregateFunction]] is - * used, a [[DataView]] can be backed by a Java heap collection or a state backend. - */ -trait DataView extends Serializable { - - /** - * Clears the [[DataView]] and removes all data. - */ - def clear(): Unit - -} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/DataViewSpec.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/DataViewSpec.scala index 943fe03..c309bf1 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/DataViewSpec.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/DataViewSpec.scala @@ -19,7 +19,6 @@ package org.apache.flink.table.api.dataview import java.lang.reflect.Field - import org.apache.flink.api.common.state.{ListStateDescriptor, MapStateDescriptor, State, StateDescriptor} import org.apache.flink.table.dataview.{ListViewTypeInfo, MapViewTypeInfo} @@ -41,7 +40,7 @@ case class ListViewSpec[T]( extends DataViewSpec[ListView[T]] { override def toStateDescriptor: StateDescriptor[_ <: State, _] = - new ListStateDescriptor[T](stateId, listViewTypeInfo.elementType) + new ListStateDescriptor[T](stateId, listViewTypeInfo.getElementType) } case class MapViewSpec[K, V]( @@ -51,5 +50,5 @@ case class MapViewSpec[K, V]( extends DataViewSpec[MapView[K, V]] { override def toStateDescriptor: StateDescriptor[_ <: State, _] = - new MapStateDescriptor[K, V](stateId, mapViewTypeInfo.keyType, mapViewTypeInfo.valueType) + new MapStateDescriptor[K, V](stateId, mapViewTypeInfo.getKeyType, mapViewTypeInfo.getValueType) } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala deleted file mode 100644 index 59b2426..0000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.api.dataview - -import java.lang.{Iterable => JIterable} -import java.util - -import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} -import org.apache.flink.table.dataview.ListViewTypeInfoFactory - -/** - * A [[ListView]] provides List functionality for accumulators used by user-defined aggregate - * functions [[org.apache.flink.api.common.functions.AggregateFunction]]. - * - * A [[ListView]] can be backed by a Java ArrayList or a state backend, depending on the context in - * which the aggregate function is used. - * - * At runtime [[ListView]] will be replaced by a [[org.apache.flink.table.dataview.StateListView]] - * if it is backed by a state backend. - * - * Example of an accumulator type with a [[ListView]] and an aggregate function that uses it: - * {{{ - * - * public class MyAccum { - * public ListView<String> list; - * public long count; - * } - * - * public class MyAgg extends AggregateFunction<Long, MyAccum> { - * - * @Override - * public MyAccum createAccumulator() { - * MyAccum accum = new MyAccum(); - * accum.list = new ListView<>(Types.STRING); - * accum.count = 0L; - * return accum; - * } - * - * public void accumulate(MyAccum accumulator, String id) { - * accumulator.list.add(id); - * ... ... - * accumulator.get() - * ... ... - * } - * - * @Override - * public Long getValue(MyAccum accumulator) { - * accumulator.list.add(id); - * ... ... - * accumulator.get() - * ... ... - * return accumulator.count; - * } - * } - * - * }}} - * - * @param elementTypeInfo element type information - * @tparam T element type - */ -@TypeInfo(classOf[ListViewTypeInfoFactory[_]]) -class ListView[T]( - @transient private[flink] val elementTypeInfo: TypeInformation[T], - private[flink] val list: util.List[T]) - extends DataView { - - /** - * Creates a list view for elements of the specified type. - * - * @param elementTypeInfo The type of the list view elements. - */ - def this(elementTypeInfo: TypeInformation[T]) { - this(elementTypeInfo, new util.ArrayList[T]()) - } - - /** - * Creates a list view. - */ - def this() = this(null) - - /** - * Returns an iterable of the list view. - * - * @throws Exception Thrown if the system cannot get data. - * @return The iterable of the list or { @code null} if the list is empty. - */ - @throws[Exception] - def get: JIterable[T] = { - if (!list.isEmpty) { - list - } else { - null - } - } - - /** - * Adds the given value to the list. - * - * @throws Exception Thrown if the system cannot add data. - * @param value The element to be appended to this list view. - */ - @throws[Exception] - def add(value: T): Unit = list.add(value) - - /** - * Adds all of the elements of the specified list to this list view. - * - * @throws Exception Thrown if the system cannot add all data. - * @param list The list with the elements that will be stored in this list view. - */ - @throws[Exception] - def addAll(list: util.List[T]): Unit = this.list.addAll(list) - - /** - * Removes all of the elements from this list view. - */ - override def clear(): Unit = list.clear() - - override def equals(other: Any): Boolean = other match { - case that: ListView[T] => - list.equals(that.list) - case _ => false - } - - override def hashCode(): Int = list.hashCode() -} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala deleted file mode 100644 index 9206d6a..0000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala +++ /dev/null @@ -1,198 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.api.dataview - -import java.lang.{Iterable => JIterable} -import java.util - -import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} -import org.apache.flink.table.dataview.MapViewTypeInfoFactory - -/** - * A [[MapView]] provides Map functionality for accumulators used by user-defined aggregate - * functions [[org.apache.flink.table.functions.AggregateFunction]]. - * - * A [[MapView]] can be backed by a Java HashMap or a state backend, depending on the context in - * which the aggregation function is used. - * - * At runtime [[MapView]] will be replaced by a [[org.apache.flink.table.dataview.StateMapView]] - * if it is backed by a state backend. - * - * Example of an accumulator type with a [[MapView]] and an aggregate function that uses it: - * {{{ - * - * public class MyAccum { - * public MapView<String, Integer> map; - * public long count; - * } - * - * public class MyAgg extends AggregateFunction<Long, MyAccum> { - * - * @Override - * public MyAccum createAccumulator() { - * MyAccum accum = new MyAccum(); - * accum.map = new MapView<>(Types.STRING, Types.INT); - * accum.count = 0L; - * return accum; - * } - * - * public void accumulate(MyAccum accumulator, String id) { - * try { - * if (!accumulator.map.contains(id)) { - * accumulator.map.put(id, 1); - * accumulator.count++; - * } - * } catch (Exception e) { - * e.printStackTrace(); - * } - * } - * - * @Override - * public Long getValue(MyAccum accumulator) { - * return accumulator.count; - * } - * } - * - * }}} - * - * @param keyTypeInfo key type information - * @param valueTypeInfo value type information - * @tparam K key type - * @tparam V value type - */ -@TypeInfo(classOf[MapViewTypeInfoFactory[_, _]]) -class MapView[K, V]( - @transient private[flink] val keyTypeInfo: TypeInformation[K], - @transient private[flink] val valueTypeInfo: TypeInformation[V], - private[flink] val map: util.Map[K, V]) - extends DataView { - - /** - * Creates a MapView with the specified key and value types. - * - * @param keyTypeInfo The type of keys of the MapView. - * @param valueTypeInfo The type of the values of the MapView. - */ - def this(keyTypeInfo: TypeInformation[K], valueTypeInfo: TypeInformation[V]) { - this(keyTypeInfo, valueTypeInfo, new util.HashMap[K, V]()) - } - - /** - * Creates a MapView. - */ - def this() = this(null, null) - - /** - * Return the value for the specified key or { @code null } if the key is not in the map view. - * - * @param key The look up key. - * @return The value for the specified key. - * @throws Exception Thrown if the system cannot get data. - */ - @throws[Exception] - def get(key: K): V = map.get(key) - - /** - * Inserts a value for the given key into the map view. - * If the map view already contains a value for the key, the existing value is overwritten. - * - * @param key The key for which the value is inserted. - * @param value The value that is inserted for the key. - * @throws Exception Thrown if the system cannot put data. - */ - @throws[Exception] - def put(key: K, value: V): Unit = map.put(key, value) - - /** - * Inserts all mappings from the specified map to this map view. - * - * @param map The map whose entries are inserted into this map view. - * @throws Exception Thrown if the system cannot access the map. - */ - @throws[Exception] - def putAll(map: util.Map[K, V]): Unit = this.map.putAll(map) - - /** - * Deletes the value for the given key. - * - * @param key The key for which the value is deleted. - * @throws Exception Thrown if the system cannot access the map. - */ - @throws[Exception] - def remove(key: K): Unit = map.remove(key) - - /** - * Checks if the map view contains a value for a given key. - * - * @param key The key to check. - * @return True if there exists a value for the given key, false otherwise. - * @throws Exception Thrown if the system cannot access the map. - */ - @throws[Exception] - def contains(key: K): Boolean = map.containsKey(key) - - /** - * Returns all entries of the map view. - * - * @return An iterable of all the key-value pairs in the map view. - * @throws Exception Thrown if the system cannot access the map. - */ - @throws[Exception] - def entries: JIterable[util.Map.Entry[K, V]] = map.entrySet() - - /** - * Returns all the keys in the map view. - * - * @return An iterable of all the keys in the map. - * @throws Exception Thrown if the system cannot access the map. - */ - @throws[Exception] - def keys: JIterable[K] = map.keySet() - - /** - * Returns all the values in the map view. - * - * @return An iterable of all the values in the map. - * @throws Exception Thrown if the system cannot access the map. - */ - @throws[Exception] - def values: JIterable[V] = map.values() - - /** - * Returns an iterator over all entries of the map view. - * - * @return An iterator over all the mappings in the map. - * @throws Exception Thrown if the system cannot access the map. - */ - @throws[Exception] - def iterator: util.Iterator[util.Map.Entry[K, V]] = map.entrySet().iterator() - - /** - * Removes all entries of this map. - */ - override def clear(): Unit = map.clear() - - override def equals(other: Any): Boolean = other match { - case that: MapView[K, V] => - map.equals(that.map) - case _ => false - } - - override def hashCode(): Int = map.hashCode() -} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala deleted file mode 100644 index 75f9326..0000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.dataview - -import org.apache.flink.api.common.typeutils._ -import org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, ListSerializerSnapshot} -import org.apache.flink.core.memory.{DataInputView, DataOutputView} -import org.apache.flink.table.api.dataview.ListView - -/** - * A serializer for [[ListView]]. The serializer relies on an element - * serializer for the serialization of the list's elements. - * - * The serialization format for the list is as follows: four bytes for the length of the list, - * followed by the serialized representation of each element. - * - * @param listSerializer List serializer. - * @tparam T The type of element in the list. - */ -@SerialVersionUID(-2030398712359267867L) -class ListViewSerializer[T](val listSerializer: TypeSerializer[java.util.List[T]]) - extends TypeSerializer[ListView[T]] - with LegacySerializerSnapshotTransformer[ListView[T]] { - - override def isImmutableType: Boolean = false - - override def duplicate(): TypeSerializer[ListView[T]] = { - new ListViewSerializer[T](listSerializer.duplicate()) - } - - override def createInstance(): ListView[T] = { - new ListView[T] - } - - override def copy(from: ListView[T]): ListView[T] = { - new ListView[T](null, listSerializer.copy(from.list)) - } - - override def copy(from: ListView[T], reuse: ListView[T]): ListView[T] = copy(from) - - override def getLength: Int = -1 - - override def serialize(record: ListView[T], target: DataOutputView): Unit = { - listSerializer.serialize(record.list, target) - } - - override def deserialize(source: DataInputView): ListView[T] = { - new ListView[T](null, listSerializer.deserialize(source)) - } - - override def deserialize(reuse: ListView[T], source: DataInputView): ListView[T] = - deserialize(source) - - override def copy(source: DataInputView, target: DataOutputView): Unit = - listSerializer.copy(source, target) - - override def hashCode(): Int = listSerializer.hashCode() - - override def equals(obj: Any): Boolean = - listSerializer.equals(obj.asInstanceOf[ListViewSerializer[_]].listSerializer) - - override def snapshotConfiguration(): ListViewSerializerSnapshot[T] = - new ListViewSerializerSnapshot[T](this) - - /** - * We need to override this as a [[LegacySerializerSnapshotTransformer]] - * because in Flink 1.6.x and below, this serializer was incorrectly returning - * directly the snapshot of the nested list serializer as its own snapshot. - * - * <p>This method transforms the incorrect list serializer snapshot - * to be a proper [[ListViewSerializerSnapshot]]. - */ - override def transformLegacySerializerSnapshot[U]( - legacySnapshot: TypeSerializerSnapshot[U] - ): TypeSerializerSnapshot[ListView[T]] = { - - legacySnapshot match { - case correctSnapshot: ListViewSerializerSnapshot[T] => - correctSnapshot - - case legacySnapshot: CollectionSerializerConfigSnapshot[java.util.List[T], T] => - // first, transform the incorrect list serializer's snapshot - // into a proper ListSerializerSnapshot - val transformedNestedListSerializerSnapshot = new ListSerializerSnapshot[T] - CompositeTypeSerializerUtil.setNestedSerializersSnapshots( - transformedNestedListSerializerSnapshot, - legacySnapshot.getSingleNestedSerializerAndConfig.f1) - - // then, wrap the transformed ListSerializerSnapshot - // as a nested snapshot in the final resulting ListViewSerializerSnapshot - val transformedListViewSerializerSnapshot = new ListViewSerializerSnapshot[T]() - CompositeTypeSerializerUtil.setNestedSerializersSnapshots( - transformedListViewSerializerSnapshot, - transformedNestedListSerializerSnapshot) - - transformedListViewSerializerSnapshot - } - } - - def getListSerializer: TypeSerializer[java.util.List[T]] = listSerializer -} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/ListViewTypeInfo.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/ListViewTypeInfo.scala deleted file mode 100644 index a10b675..0000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/ListViewTypeInfo.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.dataview - -import org.apache.flink.api.common.ExecutionConfig -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.common.typeutils.TypeSerializer -import org.apache.flink.api.common.typeutils.base.ListSerializer -import org.apache.flink.table.api.dataview.ListView - -/** - * [[ListView]] type information. - * - * @param elementType element type information - * @tparam T element type - */ -class ListViewTypeInfo[T](val elementType: TypeInformation[T]) - extends TypeInformation[ListView[T]] { - - override def isBasicType: Boolean = false - - override def isTupleType: Boolean = false - - override def getArity: Int = 1 - - override def getTotalFields: Int = 1 - - override def getTypeClass: Class[ListView[T]] = classOf[ListView[T]] - - override def isKeyType: Boolean = false - - override def createSerializer(config: ExecutionConfig): TypeSerializer[ListView[T]] = { - val typeSer = elementType.createSerializer(config) - new ListViewSerializer[T](new ListSerializer[T](typeSer)) - } - - override def canEqual(obj: scala.Any): Boolean = obj != null && obj.getClass == getClass - - override def hashCode(): Int = 31 * elementType.hashCode - - override def equals(obj: Any): Boolean = canEqual(obj) && { - obj match { - case other: ListViewTypeInfo[T] => - elementType.equals(other.elementType) - case _ => false - } - } - - override def toString: String = s"ListView<$elementType>" -} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/ListViewTypeInfoFactory.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/ListViewTypeInfoFactory.scala deleted file mode 100644 index eda6cb9..0000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/ListViewTypeInfoFactory.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.dataview - -import java.lang.reflect.Type -import java.util - -import org.apache.flink.api.common.typeinfo.{TypeInfoFactory, TypeInformation} -import org.apache.flink.api.java.typeutils.GenericTypeInfo -import org.apache.flink.table.api.dataview.ListView - -class ListViewTypeInfoFactory[T] extends TypeInfoFactory[ListView[T]] { - - override def createTypeInfo( - t: Type, - genericParameters: util.Map[String, TypeInformation[_]]): TypeInformation[ListView[T]] = { - - var elementType = genericParameters.get("T") - - if (elementType == null) { - // we might can get the elementType later from the ListView constructor - elementType = new GenericTypeInfo(classOf[Any]) - } - - new ListViewTypeInfo[T](elementType.asInstanceOf[TypeInformation[T]]) - } -} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala deleted file mode 100644 index 9947c35..0000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.dataview - -import org.apache.flink.api.common.typeutils._ -import org.apache.flink.api.common.typeutils.base.{MapSerializerConfigSnapshot, MapSerializerSnapshot} -import org.apache.flink.core.memory.{DataInputView, DataOutputView} -import org.apache.flink.table.api.dataview.MapView - -/** - * A serializer for [[MapView]]. The serializer relies on a key serializer and a value - * serializer for the serialization of the map's key-value pairs. - * - * The serialization format for the map is as follows: four bytes for the length of the map, - * followed by the serialized representation of each key-value pair. To allow null values, - * each value is prefixed by a null marker. - * - * @param mapSerializer Map serializer. - * @tparam K The type of the keys in the map. - * @tparam V The type of the values in the map. - */ -@SerialVersionUID(-9007142882049098705L) -class MapViewSerializer[K, V](val mapSerializer: TypeSerializer[java.util.Map[K, V]]) - extends TypeSerializer[MapView[K, V]] - with LegacySerializerSnapshotTransformer[MapView[K, V]] { - - override def isImmutableType: Boolean = false - - override def duplicate(): TypeSerializer[MapView[K, V]] = - new MapViewSerializer[K, V](mapSerializer.duplicate()) - - override def createInstance(): MapView[K, V] = { - new MapView[K, V]() - } - - override def copy(from: MapView[K, V]): MapView[K, V] = { - new MapView[K, V](null, null, mapSerializer.copy(from.map)) - } - - override def copy(from: MapView[K, V], reuse: MapView[K, V]): MapView[K, V] = copy(from) - - override def getLength: Int = -1 // var length - - override def serialize(record: MapView[K, V], target: DataOutputView): Unit = { - mapSerializer.serialize(record.map, target) - } - - override def deserialize(source: DataInputView): MapView[K, V] = { - new MapView[K, V](null, null, mapSerializer.deserialize(source)) - } - - override def deserialize(reuse: MapView[K, V], source: DataInputView): MapView[K, V] = - deserialize(source) - - override def copy(source: DataInputView, target: DataOutputView): Unit = - mapSerializer.copy(source, target) - - override def hashCode(): Int = mapSerializer.hashCode() - - override def equals(obj: Any): Boolean = - mapSerializer.equals(obj.asInstanceOf[MapViewSerializer[_, _]].mapSerializer) - - override def snapshotConfiguration(): MapViewSerializerSnapshot[K, V] = - new MapViewSerializerSnapshot[K, V](this) - - /** - * We need to override this as a [[LegacySerializerSnapshotTransformer]] - * because in Flink 1.6.x and below, this serializer was incorrectly returning - * directly the snapshot of the nested map serializer as its own snapshot. - * - * <p>This method transforms the incorrect map serializer snapshot - * to be a proper [[MapViewSerializerSnapshot]]. - */ - override def transformLegacySerializerSnapshot[U]( - legacySnapshot: TypeSerializerSnapshot[U] - ): TypeSerializerSnapshot[MapView[K, V]] = { - - legacySnapshot match { - case correctSnapshot: MapViewSerializerSnapshot[K, V] => - correctSnapshot - - case legacySnapshot: MapSerializerConfigSnapshot[K, V] => - // first, transform the incorrect map serializer's snapshot - // into a proper ListSerializerSnapshot - val transformedNestedMapSerializerSnapshot = - new MapSerializerSnapshot[K, V] - CompositeTypeSerializerUtil.setNestedSerializersSnapshots( - transformedNestedMapSerializerSnapshot, - legacySnapshot.getNestedSerializersAndConfigs.get(0).f1, - legacySnapshot.getNestedSerializersAndConfigs.get(1).f1 - ) - - // then, wrap the transformed MapSerializerSnapshot - // as a nested snapshot in the final resulting MapViewSerializerSnapshot - val transformedMapViewSerializerSnapshot = - new MapViewSerializerSnapshot[K, V]() - CompositeTypeSerializerUtil.setNestedSerializersSnapshots( - transformedMapViewSerializerSnapshot, - transformedNestedMapSerializerSnapshot - ) - - transformedMapViewSerializerSnapshot - } - } - - def getMapSerializer: TypeSerializer[java.util.Map[K, V]] = mapSerializer -} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/MapViewTypeInfo.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/MapViewTypeInfo.scala deleted file mode 100644 index ec5c222..0000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/MapViewTypeInfo.scala +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.dataview - -import org.apache.flink.api.common.ExecutionConfig -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.common.typeutils.TypeSerializer -import org.apache.flink.api.common.typeutils.base.MapSerializer -import org.apache.flink.table.api.dataview.MapView - -/** - * [[MapView]] type information. - * - * @param keyType key type information - * @param valueType value type information - * @tparam K key type - * @tparam V value type - */ -class MapViewTypeInfo[K, V]( - val keyType: TypeInformation[K], - val valueType: TypeInformation[V]) - extends TypeInformation[MapView[K, V]] { - - override def isBasicType = false - - override def isTupleType = false - - override def getArity = 1 - - override def getTotalFields = 1 - - override def getTypeClass: Class[MapView[K, V]] = classOf[MapView[K, V]] - - override def isKeyType: Boolean = false - - override def createSerializer(config: ExecutionConfig): TypeSerializer[MapView[K, V]] = { - val keySer = keyType.createSerializer(config) - val valueSer = valueType.createSerializer(config) - new MapViewSerializer[K, V](new MapSerializer[K, V](keySer, valueSer)) - } - - override def canEqual(obj: scala.Any): Boolean = obj != null && obj.getClass == getClass - - override def hashCode(): Int = 31 * keyType.hashCode + valueType.hashCode - - override def equals(obj: Any): Boolean = canEqual(obj) && { - obj match { - case other: MapViewTypeInfo[_, _] => - keyType.equals(other.keyType) && - valueType.equals(other.valueType) - case _ => false - } - } - - override def toString: String = s"MapView<$keyType, $valueType>" -} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/MapViewTypeInfoFactory.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/MapViewTypeInfoFactory.scala deleted file mode 100644 index 33c3ffe..0000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/MapViewTypeInfoFactory.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.dataview - -import java.lang.reflect.Type -import java.util - -import org.apache.flink.api.common.typeinfo.{TypeInfoFactory, TypeInformation} -import org.apache.flink.api.java.typeutils.GenericTypeInfo -import org.apache.flink.table.api.dataview.MapView - -class MapViewTypeInfoFactory[K, V] extends TypeInfoFactory[MapView[K, V]] { - - override def createTypeInfo( - t: Type, - genericParameters: util.Map[String, TypeInformation[_]]): TypeInformation[MapView[K, V]] = { - - var keyType = genericParameters.get("K") - var valueType = genericParameters.get("V") - - if (keyType == null) { - // we might can get the keyType later from the MapView constructor - keyType = new GenericTypeInfo(classOf[Any]) - } - - if (valueType == null) { - // we might can get the valueType later from the MapView constructor - valueType = new GenericTypeInfo(classOf[Any]) - } - - new MapViewTypeInfo[K, V]( - keyType.asInstanceOf[TypeInformation[K]], - valueType.asInstanceOf[TypeInformation[V]]) - } -} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala index 5186d66..42ce9a5 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala @@ -20,7 +20,6 @@ package org.apache.flink.table.functions.aggfunctions import java.lang.{Iterable => JIterable} import java.util - import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.typeutils._ import org.apache.flink.table.api.dataview.MapView diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala index f757189..cdb2479 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala @@ -19,16 +19,6 @@ package org.apache.flink.table.functions.utils -import java.lang.reflect.{Method, Modifier} -import java.lang.{Integer => JInt, Long => JLong} -import java.sql.{Date, Time, Timestamp} -import java.util - -import com.google.common.primitives.Primitives -import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.sql.`type`.SqlOperandTypeChecker.Consistency -import org.apache.calcite.sql.`type`._ -import org.apache.calcite.sql.{SqlCallBinding, SqlFunction, SqlOperandCountRange, SqlOperator} import org.apache.flink.api.common.functions.InvalidTypesException import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.CompositeType @@ -36,11 +26,21 @@ import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo, TypeExtract import org.apache.flink.table.api.dataview._ import org.apache.flink.table.api.{TableException, ValidationException} import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.dataview._ +import org.apache.flink.table.dataview.{ListViewTypeInfo, MapViewTypeInfo} import org.apache.flink.table.functions._ import org.apache.flink.table.plan.schema.FlinkTableFunctionImpl import org.apache.flink.table.typeutils.FieldInfoUtils -import org.apache.flink.util.InstantiationUtil + +import com.google.common.primitives.Primitives +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.sql.`type`.SqlOperandTypeChecker.Consistency +import org.apache.calcite.sql.`type`._ +import org.apache.calcite.sql.{SqlCallBinding, SqlFunction, SqlOperandCountRange, SqlOperator} + +import java.lang.reflect.{Method, Modifier} +import java.lang.{Integer => JInt, Long => JLong} +import java.sql.{Date, Time, Timestamp} +import java.util import scala.collection.mutable @@ -474,8 +474,8 @@ object UserDefinedFunctionUtils { case map: MapViewTypeInfo[_, _] => val mapView = field.get(acc).asInstanceOf[MapView[_, _]] if (mapView != null) { - val keyTypeInfo = mapView.keyTypeInfo - val valueTypeInfo = mapView.valueTypeInfo + val keyTypeInfo = mapView.keyType + val valueTypeInfo = mapView.valueType val newTypeInfo = if (keyTypeInfo != null && valueTypeInfo != null) { new MapViewTypeInfo(keyTypeInfo, valueTypeInfo) } else { @@ -499,7 +499,7 @@ object UserDefinedFunctionUtils { case list: ListViewTypeInfo[_] => val listView = field.get(acc).asInstanceOf[ListView[_]] if (listView != null) { - val elementTypeInfo = listView.elementTypeInfo + val elementTypeInfo = listView.elementType val newTypeInfo = if (elementTypeInfo != null) { new ListViewTypeInfo(elementTypeInfo) } else { diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/dataview/ListViewSerializerTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/dataview/ListViewSerializerTest.scala index 3f70bce..227551e 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/dataview/ListViewSerializerTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/dataview/ListViewSerializerTest.scala @@ -20,7 +20,6 @@ package org.apache.flink.table.dataview import java.lang.Long import java.util.Random - import org.apache.flink.api.common.typeutils.base.{ListSerializer, LongSerializer} import org.apache.flink.api.common.typeutils.{SerializerTestBase, TypeSerializer} import org.apache.flink.table.api.dataview.ListView diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/dataview/MapViewSerializerTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/dataview/MapViewSerializerTest.scala index 15f9b02..03ab7f1 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/dataview/MapViewSerializerTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/dataview/MapViewSerializerTest.scala @@ -20,7 +20,6 @@ package org.apache.flink.table.dataview import java.lang.Long import java.util.Random - import org.apache.flink.api.common.typeutils.base.{LongSerializer, MapSerializer, StringSerializer} import org.apache.flink.api.common.typeutils.{SerializerTestBase, TypeSerializer} import org.apache.flink.table.api.dataview.MapView diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataview/PerKeyStateDataViewStore.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataview/PerKeyStateDataViewStore.java index 5b79e0f..d215a03 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataview/PerKeyStateDataViewStore.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataview/PerKeyStateDataViewStore.java @@ -25,8 +25,6 @@ import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.table.typeutils.ListViewTypeInfo; -import org.apache.flink.table.typeutils.MapViewTypeInfo; /** * Default implementation of StateDataViewStore that currently forwards state registration diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataview/PerWindowStateDataViewStore.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataview/PerWindowStateDataViewStore.java index ec0f675..fcfeb68 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataview/PerWindowStateDataViewStore.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataview/PerWindowStateDataViewStore.java @@ -30,8 +30,6 @@ import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.internal.InternalListState; import org.apache.flink.runtime.state.internal.InternalMapState; import org.apache.flink.runtime.state.internal.InternalValueState; -import org.apache.flink.table.typeutils.ListViewTypeInfo; -import org.apache.flink.table.typeutils.MapViewTypeInfo; /** * An implementation of StateDataViewStore for window aggregates which forward the state diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataview/StateDataViewStore.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataview/StateDataViewStore.java index 11abeb8..aa1fb2f 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataview/StateDataViewStore.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataview/StateDataViewStore.java @@ -19,8 +19,6 @@ package org.apache.flink.table.dataview; import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.table.typeutils.ListViewTypeInfo; -import org.apache.flink.table.typeutils.MapViewTypeInfo; /** * This interface contains methods for registering {@link StateDataView} with a managed store. diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/TypeInfoDataTypeConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/TypeInfoDataTypeConverter.java index 26d4dec..ecdbaa3 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/TypeInfoDataTypeConverter.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/TypeInfoDataTypeConverter.java @@ -29,6 +29,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.table.dataformat.BaseRow; import org.apache.flink.table.dataformat.BinaryString; import org.apache.flink.table.dataformat.Decimal; +import org.apache.flink.table.dataview.MapViewTypeInfo; import org.apache.flink.table.functions.AggregateFunctionDefinition; import org.apache.flink.table.functions.TableFunctionDefinition; import org.apache.flink.table.types.logical.DecimalType; @@ -39,7 +40,6 @@ import org.apache.flink.table.typeutils.BaseRowTypeInfo; import org.apache.flink.table.typeutils.BigDecimalTypeInfo; import org.apache.flink.table.typeutils.BinaryStringTypeInfo; import org.apache.flink.table.typeutils.DecimalTypeInfo; -import org.apache.flink.table.typeutils.MapViewTypeInfo; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions;