This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 03530b6085d [FLINK-38090][table] Align data type of inline structured types for TableResult.collect() 03530b6085d is described below commit 03530b6085deb016a61111dbca9b92825cc6cf7a Author: Timo Walther <twal...@apache.org> AuthorDate: Mon Jul 14 12:39:10 2025 +0200 [FLINK-38090][table] Align data type of inline structured types for TableResult.collect() This closes #26784. --- .../flink/table/catalog/DataTypeFactoryImpl.java | 12 +- .../java/org/apache/flink/table/api/DataTypes.java | 186 +++++---- .../flink/table/catalog/DataTypeFactory.java | 5 + .../types/extraction/BaseMappingExtractor.java | 2 +- .../table/types/extraction/DataTypeExtractor.java | 33 +- .../table/types/extraction/DataTypeTemplate.java | 4 +- .../table/types/inference/TypeTransformations.java | 4 +- ...ion.java => ConversionClassTransformation.java} | 6 +- .../flink/table/types/logical/StructuredType.java | 3 +- .../flink/table/types/utils/DataTypeUtils.java | 453 ++++++++++++--------- .../flink/table/types/utils/DataTypeUtilsTest.java | 38 ++ .../table/planner/connectors/DynamicSinkUtils.java | 17 +- .../nodes/exec/serde/DataTypeJsonSerializer.java | 27 +- .../functions/StructuredFunctionsITCase.java | 79 +++- 14 files changed, 547 insertions(+), 322 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DataTypeFactoryImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DataTypeFactoryImpl.java index b623ea16df2..1fce774a81d 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DataTypeFactoryImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DataTypeFactoryImpl.java @@ -19,7 +19,6 @@ package org.apache.flink.table.catalog; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.serialization.SerializerConfig; import org.apache.flink.api.common.serialization.SerializerConfigImpl; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -64,6 +63,11 @@ final class DataTypeFactoryImpl implements DataTypeFactory { this.serializerConfig = createSerializerConfig(classLoader, config, serializerConfig); } + @Override + public ClassLoader getClassLoader() { + return classLoader; + } + @Override public DataType createDataType(AbstractDataType<?> abstractDataType) { if (abstractDataType instanceof DataType) { @@ -124,9 +128,9 @@ final class DataTypeFactoryImpl implements DataTypeFactory { // -------------------------------------------------------------------------------------------- /** - * Creates a lazy {@link ExecutionConfig} that contains options for {@link TypeSerializer}s with - * information from existing {@link ExecutionConfig} (if available) enriched with table {@link - * ReadableConfig}. + * Creates a lazy {@link SerializerConfig} that contains options for {@link TypeSerializer}s + * with information from existing {@link SerializerConfig} (if available) enriched with table + * {@link ReadableConfig}. */ private static Supplier<SerializerConfig> createSerializerConfig( ClassLoader classLoader, ReadableConfig config, SerializerConfig serializerConfig) { diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java index 74683fe61c4..d9b514cd158 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java @@ -129,7 +129,7 @@ public final class DataTypes { * // returns TIMESTAMP(9) * of(java.time.LocalDateTime.class) * - * // returns an anonymous, unregistered structured type + * // returns an unregistered (i.e. declared inline) structured type * // that is deeply integrated into the API compared to opaque RAW types * class User { * @@ -619,16 +619,11 @@ public final class DataTypes { } /** - * Unresolved data type of an array of elements with same subtype. + * Unresolved {@link #ARRAY(DataType)}. * - * <p>Compared to the SQL standard, the maximum cardinality of an array cannot be specified but - * is fixed at {@link Integer#MAX_VALUE}. Also, any valid type is supported as a subtype. - * - * <p>Note: Compared to {@link #ARRAY(DataType)}, this method produces an {@link - * UnresolvedDataType}. In most of the cases, the {@link UnresolvedDataType} will be - * automatically resolved by the API. At other locations, a {@link DataTypeFactory} is provided. - * - * @see ArrayType + * <p>Compared to {@link #ARRAY(DataType)}, this method produces an {@link UnresolvedDataType}. + * In most of the cases, the {@link UnresolvedDataType} will be automatically resolved by the + * API. At other locations, a {@link DataTypeFactory} is provided. */ public static UnresolvedDataType ARRAY(AbstractDataType<?> elementDataType) { Preconditions.checkNotNull(elementDataType, "Element data type must not be null."); @@ -654,18 +649,11 @@ public final class DataTypes { } /** - * Unresolved data type of a multiset (=bag). Unlike a set, it allows for multiple instances for - * each of its elements with a common subtype. Each unique value (including {@code NULL}) is - * mapped to some multiplicity. + * Unresolved {@link #MULTISET(DataType)}. * - * <p>There is no restriction of element types; it is the responsibility of the user to ensure - * uniqueness. - * - * <p>Note: Compared to {@link #MULTISET(DataType)}, this method produces an {@link + * <p>Compared to {@link #MULTISET(DataType)}, this method produces an {@link * UnresolvedDataType}. In most of the cases, the {@link UnresolvedDataType} will be * automatically resolved by the API. At other locations, a {@link DataTypeFactory} is provided. - * - * @see MultisetType */ public static UnresolvedDataType MULTISET(AbstractDataType<?> elementDataType) { Preconditions.checkNotNull(elementDataType, "Element data type must not be null."); @@ -694,18 +682,11 @@ public final class DataTypes { } /** - * Unresolved data type of an associative array that maps keys (including {@code NULL}) to - * values (including {@code NULL}). A map cannot contain duplicate keys; each key can map to at - * most one value. - * - * <p>There is no restriction of key types; it is the responsibility of the user to ensure - * uniqueness. The map type is an extension to the SQL standard. + * Unresolved {@link #MAP(DataType, DataType)}. * - * <p>Note: Compared to {@link #MAP(DataType, DataType)}, this method produces an {@link + * <p>Compared to {@link #MAP(DataType, DataType)}, this method produces an {@link * UnresolvedDataType}. In most of the cases, the {@link UnresolvedDataType} will be * automatically resolved by the API. At other locations, a {@link DataTypeFactory} is provided. - * - * @see MapType */ public static UnresolvedDataType MAP( AbstractDataType<?> keyDataType, AbstractDataType<?> valueDataType) { @@ -734,6 +715,7 @@ public final class DataTypes { * @see RowType */ public static DataType ROW(Field... fields) { + checkFields(fields); final List<RowField> logicalFields = Stream.of(fields) .map( @@ -777,27 +759,17 @@ public final class DataTypes { } /** - * Unresolved data type of a sequence of fields. A field consists of a field name, field type, - * and an optional description. The most specific type of a row of a table is a row type. In - * this case, each column of the row corresponds to the field of the row type that has the same - * ordinal position as the column. - * - * <p>Compared to the SQL standard, an optional field description simplifies the handling with - * complex structures. + * Unresolved {@link #ROW(Field...)}. * * <p>Use {@link #FIELD(String, AbstractDataType)} or {@link #FIELD(String, AbstractDataType, * String)} to construct fields. * - * <p>Note: Compared to {@link #ROW(Field...)}, this method produces an {@link - * UnresolvedDataType} with {@link UnresolvedField}s. In most of the cases, the {@link - * UnresolvedDataType} will be automatically resolved by the API. At other locations, a {@link - * DataTypeFactory} is provided. - * - * @see RowType + * <p>Compared to {@link #ROW(Field...)}, this method produces an {@link UnresolvedDataType} + * with {@link UnresolvedField}s. In most of the cases, the {@link UnresolvedDataType} will be + * automatically resolved by the API. At other locations, a {@link DataTypeFactory} is provided. */ public static UnresolvedDataType ROW(AbstractField... fields) { - Stream.of(fields) - .forEach(f -> Preconditions.checkNotNull(f, "Field definition must not be null.")); + checkFields(fields); return new UnresolvedDataType( () -> String.format( @@ -821,10 +793,11 @@ public final class DataTypes { } /** - * Data type of a sequence of fields. + * Unresolved {@link #ROW(DataType...)}. * - * <p>This is shortcut for {@link #ROW(AbstractField...)} where the field names will be - * generated using {@code f0, f1, f2, ...}. + * <p>Compared to {@link #ROW(DataType...)}, this method produces an {@link UnresolvedDataType} + * with {@link UnresolvedField}s. In most of the cases, the {@link UnresolvedDataType} will be + * automatically resolved by the API. At other locations, a {@link DataTypeFactory} is provided. */ public static UnresolvedDataType ROW(AbstractDataType<?>... fieldDataTypes) { return ROW( @@ -964,50 +937,99 @@ public final class DataTypes { * @see DataTypes#of(Class) * @see StructuredType */ - public static <T> DataType STRUCTURED(Class<T> implementationClass, Field... fields) { + public static DataType STRUCTURED(Class<?> implementationClass, Field... fields) { + checkFields(fields); // some basic validation of the class to prevent common mistakes validateStructuredClass(implementationClass); return buildStructuredType(StructuredType.newBuilder(implementationClass), fields); } /** - * Data type of a user-defined object structured type. Structured types are identified by a - * class name and contain zero, one or more attributes. Each attribute has a name, a type, and - * an optional description. A type cannot be defined in such a way that one of its attribute - * types (transitively) refers to itself. + * Unresolved {@link #STRUCTURED(Class, Field...)}. * - * <p>Compared to {@link #ROW(Field...)}, which may also be considered a "struct-like" type, - * structured types are distinguishable even if they contain the same set of fields. For - * example, "Visit(amount DOUBLE)" is distinct from "Interaction(amount DOUBLE)" due its - * identifier. - * - * <p>This method allows for manually constructing an inline structured type. This is useful in - * cases where the reflective extraction using {@link DataTypes#of(Class)} is not applicable. - * However, {@link DataTypes#of(Class)} is the recommended approach for creating inline - * structured types, as it also considers {@link DataTypeHint}s. + * <p>Use {@link #FIELD(String, AbstractDataType)} or {@link #FIELD(String, AbstractDataType, + * String)} to construct fields. * - * <p>Structured types are internally converted by the system into suitable data structures. - * Serialization and equality checks (e.g. {@code hashCode/equals}) are managed by the system - * based on the logical type. + * <p>Compared to {@link #STRUCTURED(Class, Field...)}, this method produces an {@link + * UnresolvedDataType} with {@link UnresolvedField}s. In most of the cases, the {@link + * UnresolvedDataType} will be automatically resolved by the API. At other locations, a {@link + * DataTypeFactory} is provided. + */ + public static UnresolvedDataType STRUCTURED( + Class<?> implementationClass, AbstractField... fields) { + checkFields(fields); + return new UnresolvedDataType( + () -> + String.format( + StructuredType.INLINE_FORMAT, + implementationClass.getName(), + Stream.of(fields) + .map(Object::toString) + .collect(Collectors.joining(", "))), + factory -> { + final Field[] fieldsArray = + Stream.of(fields) + .map( + f -> + new Field( + f.name, + factory.createDataType( + f.getAbstractDataType()), + f.description)) + .toArray(Field[]::new); + return STRUCTURED(implementationClass, fieldsArray); + }); + } + + /** + * Data type of a user-defined object structured type where the given class name is not in the + * classpath. * - * <p>If an optional implementation class is provided, the system will convert a structured - * object to a JVM object at the edges of the table ecosystem (e.g. when bridging to a function - * or connector). The implementation class must provide either a zero-argument constructor or a - * full constructor that assigns all attributes. The class name does not need to be resolvable - * in the classpath; it may be used solely to distinguish between objects with identical - * attribute sets. However, in Table API and UDF calls, the system will attempt to resolve the - * class name to an actual implementation class. If resolution fails, {@link Row} is used as a - * fallback. + * @see #STRUCTURED(Class, Field...) for more information on structured types. + */ + public static DataType STRUCTURED(String className, Field... fields) { + return buildStructuredType(StructuredType.newBuilder(className), fields); + } + + /** + * Unresolved {@link #STRUCTURED(String, Field...)}. * - * <p>Note: The caller of this method must ensure that the {@link DataType#getConversionClass()} - * of each field matches the corresponding attribute in the implementation class. Otherwise, a - * runtime exception may be thrown. + * <p>Use {@link #FIELD(String, AbstractDataType)} or {@link #FIELD(String, AbstractDataType, + * String)} to construct fields. * - * @see DataTypes#of(Class) - * @see StructuredType + * <p>Compared to {@link #STRUCTURED(Class, Field...)}, this method produces an {@link + * UnresolvedDataType} with {@link UnresolvedField}s. In most of the cases, the {@link + * UnresolvedDataType} will be automatically resolved by the API. At other locations, a {@link + * DataTypeFactory} is provided. */ - public static <T> DataType STRUCTURED(String className, Field... fields) { - return buildStructuredType(StructuredType.newBuilder(className), fields); + public static UnresolvedDataType STRUCTURED(String className, AbstractField... fields) { + checkFields(fields); + return new UnresolvedDataType( + () -> + String.format( + StructuredType.INLINE_FORMAT, + className, + Stream.of(fields) + .map(Object::toString) + .collect(Collectors.joining(", "))), + factory -> { + final Field[] fieldsArray = + Stream.of(fields) + .map( + f -> + new Field( + f.name, + factory.createDataType( + f.getAbstractDataType()), + f.description)) + .toArray(Field[]::new); + return STRUCTURED(className, fieldsArray); + }); + } + + private static void checkFields(AbstractField... fields) { + Stream.of(fields) + .forEach(f -> Preconditions.checkNotNull(f, "Field definition must not be null.")); } private static DataType buildStructuredType(StructuredType.Builder builder, Field... fields) { @@ -1130,11 +1152,11 @@ public final class DataTypes { } /** Field definition with field name, data type, and a description. */ - public static Field FIELD(String name, DataType dataType, String description) { + public static Field FIELD(String name, DataType dataType, @Nullable String description) { return new Field( Preconditions.checkNotNull(name, "Field name must not be null."), Preconditions.checkNotNull(dataType, "Field data type must not be null."), - Preconditions.checkNotNull(description, "Field description must not be null.")); + description); } /** @@ -1157,11 +1179,11 @@ public final class DataTypes { * UnresolvedField} that can contain an {@link UnresolvedDataType}. */ public static UnresolvedField FIELD( - String name, AbstractDataType<?> fieldDataType, String description) { + String name, AbstractDataType<?> fieldDataType, @Nullable String description) { return new UnresolvedField( Preconditions.checkNotNull(name, "Field name must not be null."), Preconditions.checkNotNull(fieldDataType, "Field data type must not be null."), - Preconditions.checkNotNull(description, "Field description must not be null.")); + description); } // -------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DataTypeFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DataTypeFactory.java index 5b364531cb5..c71990c393c 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DataTypeFactory.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DataTypeFactory.java @@ -41,6 +41,11 @@ import org.apache.flink.table.types.utils.TypeInfoDataTypeConverter; @PublicEvolving public interface DataTypeFactory { + /** Returns the class loader of the current session. */ + default ClassLoader getClassLoader() { + return Thread.currentThread().getContextClassLoader(); + } + /** * Creates a type out of an {@link AbstractDataType}. * diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/BaseMappingExtractor.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/BaseMappingExtractor.java index b8094957f83..c8de7463ed0 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/BaseMappingExtractor.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/BaseMappingExtractor.java @@ -610,7 +610,7 @@ abstract class BaseMappingExtractor { arg.pos); } if (argumentHint != null) { - final DataTypeTemplate template = DataTypeTemplate.fromAnnotation(argumentHint, null); + final DataTypeTemplate template = DataTypeTemplate.fromAnnotation(argumentHint); if (template.inputGroup != null) { return Optional.of(FunctionArgumentTemplate.ofInputGroup(template.inputGroup)); } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java index ad734649a96..a3cabcc660b 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java @@ -107,6 +107,10 @@ public final class DataTypeExtractor { this.contextExplanation = contextExplanation; } + // -------------------------------------------------------------------------------------------- + // Methods that extract a data type from a JVM Type without any prior information + // -------------------------------------------------------------------------------------------- + /** Extracts a data type from a type without considering surrounding classes or templates. */ public static DataType extractFromType(DataTypeFactory typeFactory, Type type) { return extractDataTypeWithClassContext( @@ -114,7 +118,7 @@ public final class DataTypeExtractor { } /** Extracts a data type from a type without considering surrounding classes but templates. */ - public static DataType extractFromType( + static DataType extractFromType( DataTypeFactory typeFactory, DataTypeTemplate template, Type type) { return extractDataTypeWithClassContext(typeFactory, template, null, type, ""); } @@ -239,6 +243,31 @@ public final class DataTypeExtractor { method.getName(), baseClass.getName())); } + // -------------------------------------------------------------------------------------------- + // Methods that extract a data type from a JVM Class with prior logical information + // -------------------------------------------------------------------------------------------- + + public static DataType extractFromStructuredClass( + DataTypeFactory typeFactory, Class<?> implementationClass) { + final DataType dataType = + extractDataTypeWithClassContext( + typeFactory, + DataTypeTemplate.fromDefaults(), + implementationClass.getEnclosingClass(), + implementationClass, + ""); + if (!dataType.getLogicalType().is(LogicalTypeRoot.STRUCTURED_TYPE)) { + throw extractionError( + "Structured data type expected for class '%s' but was: %s", + implementationClass.getName(), dataType); + } + return dataType; + } + + // -------------------------------------------------------------------------------------------- + // Supporting methods + // -------------------------------------------------------------------------------------------- + private static DataType extractDataTypeWithClassContext( DataTypeFactory typeFactory, DataTypeTemplate outerTemplate, @@ -255,8 +284,6 @@ public final class DataTypeExtractor { return extractor.extractDataTypeOrRaw(outerTemplate, typeHierarchy, type); } - // -------------------------------------------------------------------------------------------- - private DataType extractDataTypeOrRaw( DataTypeTemplate outerTemplate, List<Type> typeHierarchy, Type type) { // best effort resolution of type variables, the resolved type can still be a variable diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeTemplate.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeTemplate.java index 08426874ec2..785c5aae5f6 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeTemplate.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeTemplate.java @@ -119,8 +119,8 @@ final class DataTypeTemplate { * Creates an instance from the given {@link ArgumentHint} with a resolved data type if * available. */ - static DataTypeTemplate fromAnnotation(ArgumentHint argumentHint, @Nullable DataType dataType) { - return fromAnnotation(argumentHint.type(), dataType); + static DataTypeTemplate fromAnnotation(ArgumentHint argumentHint) { + return fromAnnotation(argumentHint.type(), null); } /** diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeTransformations.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeTransformations.java index 28afb9a37d3..bdd18459b44 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeTransformations.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeTransformations.java @@ -20,7 +20,7 @@ package org.apache.flink.table.types.inference; import org.apache.flink.annotation.Internal; import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.inference.transforms.DataTypeConversionClassTransformation; +import org.apache.flink.table.types.inference.transforms.ConversionClassTransformation; import org.apache.flink.table.types.inference.transforms.LegacyRawTypeTransformation; import org.apache.flink.table.types.inference.transforms.LegacyToNonLegacyTransformation; import org.apache.flink.table.types.logical.LogicalTypeRoot; @@ -55,7 +55,7 @@ public final class TypeTransformations { conversions.put(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, Timestamp.class); conversions.put(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE, Time.class); conversions.put(LogicalTypeRoot.DATE, Date.class); - return new DataTypeConversionClassTransformation(conversions); + return new ConversionClassTransformation(conversions); } /** diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/transforms/DataTypeConversionClassTransformation.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/transforms/ConversionClassTransformation.java similarity index 87% rename from flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/transforms/DataTypeConversionClassTransformation.java rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/transforms/ConversionClassTransformation.java index ff3416d1ce1..7b9b76153ba 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/transforms/DataTypeConversionClassTransformation.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/transforms/ConversionClassTransformation.java @@ -27,16 +27,16 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot; import java.util.Map; /** - * This type transformation transforms the specified data types to a new one with the expected + * This type transformation transforms the specified {@link DataType} to a new one with the expected * conversion class. The mapping from data type to conversion class is defined by the constructor * parameter {@link #conversions} map that maps from type root to the expected conversion class. */ @Internal -public class DataTypeConversionClassTransformation implements TypeTransformation { +public class ConversionClassTransformation implements TypeTransformation { private final Map<LogicalTypeRoot, Class<?>> conversions; - public DataTypeConversionClassTransformation(Map<LogicalTypeRoot, Class<?>> conversions) { + public ConversionClassTransformation(Map<LogicalTypeRoot, Class<?>> conversions) { this.conversions = conversions; } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/StructuredType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/StructuredType.java index 2fe7549700e..8d1f271026e 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/StructuredType.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/StructuredType.java @@ -110,12 +110,11 @@ public final class StructuredType extends UserDefinedType { public static final String CATALOG_FORMAT = "%s"; public static final String INLINE_FORMAT = "STRUCTURED<'%s', %s>"; + public static final Class<?> FALLBACK_CONVERSION = Row.class; private static final Set<String> INPUT_OUTPUT_CONVERSION = conversionSet(Row.class.getName(), RowData.class.getName()); - private static final Class<?> FALLBACK_CONVERSION = Row.class; - /** Defines an attribute of a {@link StructuredType}. */ @PublicEvolving public static final class StructuredAttribute implements Serializable { diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java index 1745d4eebd2..6ca310896c1 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java @@ -21,18 +21,19 @@ package org.apache.flink.table.types.utils; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.DataTypes.Field; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.DataTypeFactory; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedSchema; -import org.apache.flink.table.connector.Projection; import org.apache.flink.table.types.AtomicDataType; import org.apache.flink.table.types.CollectionDataType; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.DataTypeVisitor; import org.apache.flink.table.types.FieldsDataType; import org.apache.flink.table.types.KeyValueDataType; +import org.apache.flink.table.types.extraction.DataTypeExtractor; import org.apache.flink.table.types.inference.TypeTransformation; import org.apache.flink.table.types.logical.ArrayType; import org.apache.flink.table.types.logical.DistinctType; @@ -40,6 +41,7 @@ import org.apache.flink.table.types.logical.LegacyTypeInformationType; import org.apache.flink.table.types.logical.LocalZonedTimestampType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.MapType; import org.apache.flink.table.types.logical.MultisetType; import org.apache.flink.table.types.logical.RowType; @@ -49,7 +51,6 @@ import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute; import org.apache.flink.table.types.logical.TimestampKind; import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor; -import org.apache.flink.table.types.logical.utils.LogicalTypeUtils; import org.apache.flink.util.Preconditions; import javax.annotation.Nullable; @@ -57,6 +58,7 @@ import javax.annotation.Nullable; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -76,47 +78,53 @@ import static org.apache.flink.table.types.logical.utils.LogicalTypeUtils.toInte @Internal public final class DataTypeUtils { + // -------------------------------------------------------------------------------------------- + // Time attribute utilities + // -------------------------------------------------------------------------------------------- + /** - * @deprecated Use the {@link Projection} type - * @see Projection#project(DataType) + * Removes time attributes from the {@link DataType}. As everywhere else in the code base, this + * method does not support nested time attributes for now. */ - @Deprecated - public static DataType projectRow(DataType dataType, int[][] indexPaths) { - return Projection.of(indexPaths).project(dataType); + public static DataType removeTimeAttribute(DataType dataType) { + final LogicalType type = dataType.getLogicalType(); + if (type.is(LogicalTypeFamily.TIMESTAMP)) { + return replaceLogicalType(dataType, removeTimeAttributes(type)); + } + return dataType; + } + + /** Returns a PROCTIME data type. */ + public static DataType createProctimeDataType() { + return new AtomicDataType(new LocalZonedTimestampType(true, TimestampKind.PROCTIME, 3)); } /** - * @deprecated Use the {@link Projection} type - * @see Projection#project(DataType) + * {@link ResolvedSchema#toPhysicalRowDataType()} erases time attributes. This method keeps them + * during conversion for very specific use cases mostly in Table API. */ - @Deprecated - public static DataType projectRow(DataType dataType, int[] indexPaths) { - return Projection.of(indexPaths).project(dataType); + public static DataType fromResolvedSchemaPreservingTimeAttributes( + ResolvedSchema resolvedSchema) { + final List<String> fieldNames = resolvedSchema.getColumnNames(); + final List<DataType> fieldTypes = resolvedSchema.getColumnDataTypes(); + return DataTypes.ROW( + IntStream.range(0, fieldNames.size()) + .mapToObj( + pos -> + DataTypes.FIELD( + fieldNames.get(pos), fieldTypes.get(pos))) + .collect(Collectors.toList())) + .notNull(); } - /** Removes a string prefix from the fields of the given row data type. */ - public static DataType stripRowPrefix(DataType dataType, String prefix) { - Preconditions.checkArgument(dataType.getLogicalType().is(ROW), "Row data type expected."); - final RowType rowType = (RowType) dataType.getLogicalType(); - final List<String> newFieldNames = - rowType.getFieldNames().stream() - .map( - s -> { - if (s.startsWith(prefix)) { - return s.substring(prefix.length()); - } - return s; - }) - .collect(Collectors.toList()); - final LogicalType newRowType = LogicalTypeUtils.renameRowFields(rowType, newFieldNames); - return new FieldsDataType( - newRowType, dataType.getConversionClass(), dataType.getChildren()); - } + // -------------------------------------------------------------------------------------------- + // Composite data type utilities + // -------------------------------------------------------------------------------------------- /** Appends the given list of fields to an existing row data type. */ - public static DataType appendRowFields(DataType dataType, List<DataTypes.Field> fields) { + public static DataType appendRowFields(DataType dataType, List<Field> fields) { Preconditions.checkArgument(dataType.getLogicalType().is(ROW), "Row data type expected."); - if (fields.size() == 0) { + if (fields.isEmpty()) { return dataType; } DataType newRow = @@ -128,93 +136,6 @@ public final class DataTypeUtils { return newRow; } - /** - * Creates a {@link DataType} from the given {@link LogicalType} with internal data structures. - */ - public static DataType toInternalDataType(LogicalType logicalType) { - final DataType defaultDataType = TypeConversions.fromLogicalToDataType(logicalType); - return toInternalDataType(defaultDataType); - } - - /** Creates a {@link DataType} from the given {@link DataType} with internal data structures. */ - public static DataType toInternalDataType(DataType dataType) { - return dataType.bridgedTo(toInternalConversionClass(dataType.getLogicalType())); - } - - /** Checks whether a given data type is an internal data structure. */ - public static boolean isInternal(DataType dataType) { - return isInternal(dataType, true); - } - - /** Checks whether a given data type is an internal data structure. */ - public static boolean isInternal(DataType dataType, boolean autobox) { - final Class<?> clazz; - if (autobox) { - clazz = primitiveToWrapper(dataType.getConversionClass()); - } else { - clazz = dataType.getConversionClass(); - } - - return clazz == toInternalConversionClass(dataType.getLogicalType()); - } - - /** - * Replaces the {@link LogicalType} of a {@link DataType}, i.e., it keeps the bridging class. - */ - public static DataType replaceLogicalType(DataType dataType, LogicalType replacement) { - return LogicalTypeDataTypeConverter.toDataType(replacement) - .bridgedTo(dataType.getConversionClass()); - } - - /** - * Removes time attributes from the {@link DataType}. As everywhere else in the code base, this - * method does not support nested time attributes for now. - */ - public static DataType removeTimeAttribute(DataType dataType) { - final LogicalType type = dataType.getLogicalType(); - if (type.is(LogicalTypeFamily.TIMESTAMP)) { - return replaceLogicalType(dataType, removeTimeAttributes(type)); - } - return dataType; - } - - /** - * Transforms the given data type to a different data type using the given transformations. - * - * @see #transform(DataTypeFactory, DataType, TypeTransformation...) - */ - public static DataType transform( - DataType typeToTransform, TypeTransformation... transformations) { - return transform(null, typeToTransform, transformations); - } - - /** - * Transforms the given data type to a different data type using the given transformations. - * - * <p>The transformations will be called in the given order. In case of constructed or composite - * types, a transformation will be applied transitively to children first. - * - * <p>Both the {@link DataType#getLogicalType()} and {@link DataType#getConversionClass()} can - * be transformed. - * - * @param factory {@link DataTypeFactory} if available - * @param typeToTransform data type to be transformed. - * @param transformations the transformations to transform data type to another type. - * @return the new data type - */ - public static DataType transform( - @Nullable DataTypeFactory factory, - DataType typeToTransform, - TypeTransformation... transformations) { - Preconditions.checkArgument( - transformations.length > 0, "transformations should not be empty."); - DataType newType = typeToTransform; - for (TypeTransformation transformation : transformations) { - newType = newType.accept(new DataTypeTransformer(factory, transformation)); - } - return newType; - } - /** * Expands a composite {@link DataType} to a corresponding {@link ResolvedSchema}. Useful for * flattening a column or mapping a physical to logical type of a table source @@ -241,6 +162,56 @@ public final class DataTypeUtils { throw new IllegalArgumentException("Expected a composite type"); } + private static ResolvedSchema expandCompositeType(FieldsDataType dataType) { + DataType[] fieldDataTypes = dataType.getChildren().toArray(new DataType[0]); + return dataType.getLogicalType() + .accept( + new LogicalTypeDefaultVisitor<>() { + @Override + public ResolvedSchema visit(RowType rowType) { + return expandCompositeType(rowType, fieldDataTypes); + } + + @Override + public ResolvedSchema visit(StructuredType structuredType) { + return expandCompositeType(structuredType, fieldDataTypes); + } + + @Override + public ResolvedSchema visit(DistinctType distinctType) { + return distinctType.getSourceType().accept(this); + } + + @Override + protected ResolvedSchema defaultMethod(LogicalType logicalType) { + throw new IllegalArgumentException("Expected a composite type"); + } + }); + } + + private static ResolvedSchema expandLegacyCompositeType(DataType dataType) { + // legacy composite type + CompositeType<?> compositeType = + (CompositeType<?>) + ((LegacyTypeInformationType<?>) dataType.getLogicalType()) + .getTypeInformation(); + + String[] fieldNames = compositeType.getFieldNames(); + DataType[] fieldTypes = + Arrays.stream(fieldNames) + .map(compositeType::getTypeAt) + .map(TypeConversions::fromLegacyInfoToDataType) + .toArray(DataType[]::new); + + return ResolvedSchema.physical(fieldNames, fieldTypes); + } + + private static ResolvedSchema expandCompositeType( + LogicalType compositeType, DataType[] fieldDataTypes) { + final String[] fieldNames = getFieldNames(compositeType).toArray(new String[0]); + return ResolvedSchema.physical(fieldNames, fieldDataTypes); + } + /** * Retrieves a nested field from a composite type at given position. * @@ -306,50 +277,89 @@ public final class DataTypeUtils { } } + // -------------------------------------------------------------------------------------------- + // Logical type utilities + // -------------------------------------------------------------------------------------------- + /** - * The {@link DataType} class can only partially verify the conversion class. This method can - * perform the final check when we know if the data type should be used for input. + * Replaces the {@link LogicalType} of a {@link DataType}, i.e., it keeps the bridging class. */ - public static void validateInputDataType(DataType dataType) { - dataType.accept(DataTypeInputClassValidator.INSTANCE); + public static DataType replaceLogicalType(DataType dataType, LogicalType replacement) { + return LogicalTypeDataTypeConverter.toDataType(replacement) + .bridgedTo(dataType.getConversionClass()); } + // -------------------------------------------------------------------------------------------- + // Conversion class utilities + // -------------------------------------------------------------------------------------------- + /** - * The {@link DataType} class can only partially verify the conversion class. This method can - * perform the final check when we know if the data type should be used for output. + * Creates a {@link DataType} from the given {@link LogicalType} with internal data structures. */ - public static void validateOutputDataType(DataType dataType) { - dataType.accept(DataTypeOutputClassValidator.INSTANCE); + public static DataType toInternalDataType(LogicalType logicalType) { + final DataType defaultDataType = TypeConversions.fromLogicalToDataType(logicalType); + return toInternalDataType(defaultDataType); } - /** Returns a PROCTIME data type. */ - public static DataType createProctimeDataType() { - return new AtomicDataType(new LocalZonedTimestampType(true, TimestampKind.PROCTIME, 3)); + /** Creates a {@link DataType} from the given {@link DataType} with internal data structures. */ + public static DataType toInternalDataType(DataType dataType) { + return dataType.bridgedTo(toInternalConversionClass(dataType.getLogicalType())); + } + + /** Checks whether a given data type is an internal data structure. */ + public static boolean isInternal(DataType dataType) { + return isInternal(dataType, true); + } + + /** Checks whether a given data type is an internal data structure. */ + public static boolean isInternal(DataType dataType, boolean autobox) { + final Class<?> clazz; + if (autobox) { + clazz = primitiveToWrapper(dataType.getConversionClass()); + } else { + clazz = dataType.getConversionClass(); + } + + return clazz == toInternalConversionClass(dataType.getLogicalType()); } /** - * {@link ResolvedSchema#toPhysicalRowDataType()} erases time attributes. This method keeps them - * during conversion for very specific use cases mostly in Table API. + * Checks whether this data type and its children use the {@link + * LogicalType#getDefaultConversion()} defined by the logical type. */ - public static DataType fromResolvedSchemaPreservingTimeAttributes( - ResolvedSchema resolvedSchema) { - final List<String> fieldNames = resolvedSchema.getColumnNames(); - final List<DataType> fieldTypes = resolvedSchema.getColumnDataTypes(); - return DataTypes.ROW( - IntStream.range(0, fieldNames.size()) - .mapToObj( - pos -> - DataTypes.FIELD( - fieldNames.get(pos), fieldTypes.get(pos))) - .collect(Collectors.toList())) - .notNull(); + public static boolean isDefaultClassNested(DataType dataType) { + return isDefaultClass(dataType) + && dataType.getChildren().stream().allMatch(DataTypeUtils::isDefaultClassNested); } - private DataTypeUtils() { - // no instantiation + /** + * Checks whether this data type uses the {@link LogicalType#getDefaultConversion()} defined by + * the logical type. + */ + public static boolean isDefaultClass(DataType dataType) { + return Objects.equals( + dataType.getConversionClass(), dataType.getLogicalType().getDefaultConversion()); } - // ------------------------------------------------------------------------------------------ + // -------------------------------------------------------------------------------------------- + // Data type validation + // -------------------------------------------------------------------------------------------- + + /** + * The {@link DataType} class can only partially verify the conversion class. This method can + * perform the final check when we know if the data type should be used for input. + */ + public static void validateInputDataType(DataType dataType) { + dataType.accept(DataTypeInputClassValidator.INSTANCE); + } + + /** + * The {@link DataType} class can only partially verify the conversion class. This method can + * perform the final check when we know if the data type should be used for output. + */ + public static void validateOutputDataType(DataType dataType) { + dataType.accept(DataTypeOutputClassValidator.INSTANCE); + } private static class DataTypeInputClassValidator extends DataTypeDefaultVisitor<Void> { @@ -388,6 +398,47 @@ public final class DataTypeUtils { } } + // -------------------------------------------------------------------------------------------- + // Data type transformations + // -------------------------------------------------------------------------------------------- + + /** + * Transforms the given data type to a different data type using the given transformations. + * + * @see #transform(DataTypeFactory, DataType, TypeTransformation...) + */ + public static DataType transform( + DataType typeToTransform, TypeTransformation... transformations) { + return transform(null, typeToTransform, transformations); + } + + /** + * Transforms the given data type to a different data type using the given transformations. + * + * <p>The transformations will be called in the given order. In case of constructed or composite + * types, a transformation will be applied transitively to children first. + * + * <p>Both the {@link DataType#getLogicalType()} and {@link DataType#getConversionClass()} can + * be transformed. + * + * @param factory {@link DataTypeFactory} if available + * @param typeToTransform data type to be transformed. + * @param transformations the transformations to transform data type to another type. + * @return the new data type + */ + public static DataType transform( + @Nullable DataTypeFactory factory, + DataType typeToTransform, + TypeTransformation... transformations) { + Preconditions.checkArgument( + transformations.length > 0, "transformations should not be empty."); + DataType newType = typeToTransform; + for (TypeTransformation transformation : transformations) { + newType = newType.accept(new DataTypeTransformer(factory, transformation)); + } + return newType; + } + /** * Transforms a {@link DataType}. * @@ -543,53 +594,81 @@ public final class DataTypeUtils { } } - private static ResolvedSchema expandCompositeType(FieldsDataType dataType) { - DataType[] fieldDataTypes = dataType.getChildren().toArray(new DataType[0]); - return dataType.getLogicalType() - .accept( - new LogicalTypeDefaultVisitor<ResolvedSchema>() { - @Override - public ResolvedSchema visit(RowType rowType) { - return expandCompositeType(rowType, fieldDataTypes); - } + // -------------------------------------------------------------------------------------------- + // Structured type alignment + // -------------------------------------------------------------------------------------------- - @Override - public ResolvedSchema visit(StructuredType structuredType) { - return expandCompositeType(structuredType, fieldDataTypes); - } + /** + * Aligns the {@link DataType} and its nested conversion classes with the given {@link + * StructuredType#getImplementationClass()}. + * + * <p>By default, a data type is created from a {@link LogicalType} and uses default conversion + * classes. But for conversion to the implementation class, the data type must reflect the + * correct expected classes (e.g. {@code List<Integer>} instead of {@code Integer[]}) for all + * attributes. + */ + public static DataType alignStructuredTypes(DataTypeFactory factory, DataType dataType) { + return dataType.accept(new StructuredTypeAligner(factory)); + } - @Override - public ResolvedSchema visit(DistinctType distinctType) { - return distinctType.getSourceType().accept(this); - } + private static class StructuredTypeAligner implements DataTypeVisitor<DataType> { - @Override - protected ResolvedSchema defaultMethod(LogicalType logicalType) { - throw new IllegalArgumentException("Expected a composite type"); - } - }); - } + private final DataTypeFactory factory; - private static ResolvedSchema expandLegacyCompositeType(DataType dataType) { - // legacy composite type - CompositeType<?> compositeType = - (CompositeType<?>) - ((LegacyTypeInformationType<?>) dataType.getLogicalType()) - .getTypeInformation(); + public StructuredTypeAligner(DataTypeFactory factory) { + this.factory = factory; + } - String[] fieldNames = compositeType.getFieldNames(); - DataType[] fieldTypes = - Arrays.stream(fieldNames) - .map(compositeType::getTypeAt) - .map(TypeConversions::fromLegacyInfoToDataType) - .toArray(DataType[]::new); + @Override + public DataType visit(AtomicDataType atomicDataType) { + return atomicDataType; + } - return ResolvedSchema.physical(fieldNames, fieldTypes); + @Override + public DataType visit(CollectionDataType collectionDataType) { + return new CollectionDataType( + collectionDataType.getLogicalType(), + collectionDataType.getConversionClass(), + collectionDataType.getElementDataType().accept(this)); + } + + @Override + public DataType visit(FieldsDataType fieldsDataType) { + final LogicalType logicalType = fieldsDataType.getLogicalType(); + if (logicalType.is(LogicalTypeRoot.STRUCTURED_TYPE) + && logicalType.getDefaultConversion() != StructuredType.FALLBACK_CONVERSION) { + return alignDataType(fieldsDataType, (StructuredType) logicalType); + } + return new FieldsDataType( + fieldsDataType.getLogicalType(), + fieldsDataType.getConversionClass(), + fieldsDataType.getChildren().stream() + .map(f -> f.accept(this)) + .collect(Collectors.toList())); + } + + @Override + public DataType visit(KeyValueDataType keyValueDataType) { + return new KeyValueDataType( + keyValueDataType.getLogicalType(), + keyValueDataType.getConversionClass(), + keyValueDataType.getKeyDataType().accept(this), + keyValueDataType.getValueDataType().accept(this)); + } + + private DataType alignDataType(DataType defaultDataType, StructuredType structuredType) { + final Class<?> implementationClass = + structuredType.getImplementationClass().orElseThrow(IllegalStateException::new); + try { + return DataTypeExtractor.extractFromStructuredClass(factory, implementationClass); + } catch (ValidationException e) { + // Necessary for legacy code paths and tests written in Scala + return defaultDataType; + } + } } - private static ResolvedSchema expandCompositeType( - LogicalType compositeType, DataType[] fieldDataTypes) { - final String[] fieldNames = getFieldNames(compositeType).toArray(new String[0]); - return ResolvedSchema.physical(fieldNames, fieldDataTypes); + private DataTypeUtils() { + // no instantiation } } diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/utils/DataTypeUtilsTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/utils/DataTypeUtilsTest.java index 430111543bc..f4ad686cd98 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/utils/DataTypeUtilsTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/utils/DataTypeUtilsTest.java @@ -19,7 +19,9 @@ package org.apache.flink.table.types.utils; import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.Column; @@ -234,4 +236,40 @@ class DataTypeUtilsTest { + java.util.HashMap.class.getName() + "'."); } + + @Test + void testStructuredTypeAlignment() { + final DataTypeFactoryMock factoryMock = new DataTypeFactoryMock(); + + final DataType expectedDataType = DataTypes.of(NestedPojo.class).toDataType(factoryMock); + + // Erases conversion classes and uses default conversion classes + final DataType defaultDataType = DataTypes.of(expectedDataType.getLogicalType()); + + final DataType dataType = + DataTypeUtils.alignStructuredTypes( + factoryMock, DataTypes.ROW(DataTypes.INT(), defaultDataType)); + assertThat(dataType).isEqualTo(DataTypes.ROW(DataTypes.INT(), expectedDataType)); + } + + /** POJO that does not use default conversions for certain data types. */ + public static class Pojo { + // Primitive instead of boxed class + public int i; + + // List instead of array and java.sql.Timestamp instead of java.time.LocalDateTime + public List<Timestamp> timestamps; + + // byte[] as annotated STRING data type + @DataTypeHint("STRING") + public byte[] string; + } + + /** Nested POJO that does not use default conversions for certain data types. */ + public static class NestedPojo { + + public Tuple2<Boolean, Double> tuple; + + public List<Pojo> pojoList; + } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java index e5bcfa075ce..411470f5dbc 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java @@ -43,6 +43,7 @@ import org.apache.flink.table.catalog.TableDistribution; import org.apache.flink.table.catalog.UnresolvedIdentifier; import org.apache.flink.table.connector.RowLevelModificationScanContext; import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.DynamicTableSink.DataStructureConverter; import org.apache.flink.table.connector.sink.abilities.SupportsBucketing; import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite; import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning; @@ -139,7 +140,7 @@ public final class DynamicSinkUtils { final ContextResolvedTable contextResolvedTable = ContextResolvedTable.anonymous("collect", catalogTable); - final DataType consumedDataType = fixCollectDataType(dataTypeFactory, schema); + final DataType consumedDataType = deriveCollectDataType(dataTypeFactory, schema); final String zone = configuration.get(TableConfigOptions.LOCAL_TIME_ZONE); final ZoneId zoneId = @@ -897,17 +898,23 @@ public final class DynamicSinkUtils { // -------------------------------------------------------------------------------------------- - /** Temporary solution until we drop legacy types. */ - private static DataType fixCollectDataType( + /** + * Prepares a {@link DataType} for {@link DataStructureConverter} from {@link ResolvedSchema}. + */ + private static DataType deriveCollectDataType( DataTypeFactory dataTypeFactory, ResolvedSchema schema) { + // TODO erase the conversion class earlier when dropping legacy code, esp. FLINK-22321 final DataType fixedDataType = DataTypeUtils.transform( dataTypeFactory, schema.toSourceRowDataType(), TypeTransformations.legacyRawToTypeInfoRaw(), TypeTransformations.legacyToNonLegacy()); - // TODO erase the conversion class earlier when dropping legacy code, esp. FLINK-22321 - return TypeConversions.fromLogicalToDataType(fixedDataType.getLogicalType()); + final DataType defaultDataType = + TypeConversions.fromLogicalToDataType(fixedDataType.getLogicalType()); + + // Structured types might not use default conversion classes. + return DataTypeUtils.alignStructuredTypes(dataTypeFactory, defaultDataType); } /** diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonSerializer.java index 8593493bac3..28009f1a236 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonSerializer.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonSerializer.java @@ -23,6 +23,7 @@ import org.apache.flink.table.api.DataTypes.Field; import org.apache.flink.table.types.CollectionDataType; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.KeyValueDataType; +import org.apache.flink.table.types.utils.DataTypeUtils; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider; @@ -30,7 +31,6 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.S import java.io.IOException; import java.util.List; -import java.util.Objects; import java.util.stream.Collectors; import static org.apache.flink.table.types.utils.DataTypeUtils.isInternal; @@ -96,7 +96,6 @@ final class DataTypeJsonSerializer extends StdSerializer<DataType> { // ROW, STRUCTURED_TYPE, DISTINCT_TYPE static final String FIELD_NAME_FIELDS = "fields"; static final String FIELD_NAME_FIELD_NAME = "name"; - static final String FIELD_NAME_FIELD_CLASS = "fieldClass"; DataTypeJsonSerializer() { super(DataType.class); @@ -106,7 +105,7 @@ final class DataTypeJsonSerializer extends StdSerializer<DataType> { public void serialize( DataType dataType, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException { - if (isDefaultClassNested(dataType)) { + if (DataTypeUtils.isDefaultClassNested(dataType)) { serializerProvider.defaultSerializeValue(dataType.getLogicalType(), jsonGenerator); } else { jsonGenerator.writeStartObject(); @@ -120,7 +119,7 @@ final class DataTypeJsonSerializer extends StdSerializer<DataType> { private static void serializeClass(DataType dataType, JsonGenerator jsonGenerator) throws IOException { // skip the conversion class if only nested types contain custom conversion classes - if (!isDefaultClass(dataType)) { + if (!DataTypeUtils.isDefaultClass(dataType)) { jsonGenerator.writeStringField( FIELD_NAME_CONVERSION_CLASS, dataType.getConversionClass().getName()); } @@ -149,7 +148,10 @@ final class DataTypeJsonSerializer extends StdSerializer<DataType> { case STRUCTURED_TYPE: final List<Field> nonDefaultFields = DataType.getFields(dataType).stream() - .filter(field -> !isDefaultClassNested(field.getDataType())) + .filter( + field -> + !DataTypeUtils.isDefaultClassNested( + field.getDataType())) .collect(Collectors.toList()); if (nonDefaultFields.isEmpty()) { break; @@ -167,7 +169,7 @@ final class DataTypeJsonSerializer extends StdSerializer<DataType> { break; case DISTINCT_TYPE: final DataType sourceDataType = dataType.getChildren().get(0); - if (!isDefaultClassNested(sourceDataType)) { + if (!DataTypeUtils.isDefaultClassNested(sourceDataType)) { serializeClass(sourceDataType, jsonGenerator); } break; @@ -178,22 +180,11 @@ final class DataTypeJsonSerializer extends StdSerializer<DataType> { private static void serializeFieldIfNotDefaultClass( DataType dataType, String fieldName, JsonGenerator jsonGenerator) throws IOException { - if (!isDefaultClassNested(dataType)) { + if (!DataTypeUtils.isDefaultClassNested(dataType)) { jsonGenerator.writeFieldName(fieldName); jsonGenerator.writeStartObject(); serializeClass(dataType, jsonGenerator); jsonGenerator.writeEndObject(); } } - - private static boolean isDefaultClassNested(DataType dataType) { - return isDefaultClass(dataType) - && dataType.getChildren().stream() - .allMatch(DataTypeJsonSerializer::isDefaultClassNested); - } - - private static boolean isDefaultClass(DataType dataType) { - return Objects.equals( - dataType.getConversionClass(), dataType.getLogicalType().getDefaultConversion()); - } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/StructuredFunctionsITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/StructuredFunctionsITCase.java index 61bfcd01eb2..d7d6558143c 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/StructuredFunctionsITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/StructuredFunctionsITCase.java @@ -23,6 +23,10 @@ import org.apache.flink.table.functions.BuiltInFunctionDefinitions; import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.table.types.logical.StructuredType; +import java.sql.Timestamp; +import java.util.List; +import java.util.Map; +import java.util.Objects; import java.util.stream.Stream; /** Tests for functions dealing with {@link StructuredType}. */ @@ -45,7 +49,7 @@ public class StructuredFunctionsITCase extends BuiltInFunctionTestBase { // Same value from CAST .testSqlResult( "Type1Constructor(f0, f1) = CAST((14, 'Bob') AS " - + Type1.TYPE + + Type1.TYPE_STRING + ")", true, DataTypes.BOOLEAN()) @@ -57,7 +61,7 @@ public class StructuredFunctionsITCase extends BuiltInFunctionTestBase { // Different value from CAST .testSqlResult( "Type1Constructor(f0, f1) = CAST((15, 'Alice') AS " - + Type1.TYPE + + Type1.TYPE_STRING + ")", false, DataTypes.BOOLEAN()) @@ -68,7 +72,7 @@ public class StructuredFunctionsITCase extends BuiltInFunctionTestBase { // Different class name from CAST .testSqlValidationError( "Type1Constructor(f0, f1) = CAST((14, 'Bob') AS " - + Type2.TYPE + + Type2.TYPE_STRING + ")", "Incompatible structured types") // Same class name but different fields @@ -83,9 +87,24 @@ public class StructuredFunctionsITCase extends BuiltInFunctionTestBase { "NestedConstructor(Type1Constructor(f0, f1), Type2Constructor(15, 'Alice')) = CAST(" + "(CAST((14, 'Bob') AS %s), CAST((15, 'Alice') AS %s))" + " AS %s)", - Type1.TYPE, Type2.TYPE, NestedType.TYPE), + Type1.TYPE_STRING, + Type2.TYPE_STRING, + NestedType.TYPE_STRING), true, - DataTypes.BOOLEAN())); + DataTypes.BOOLEAN()) + .testSqlResult( + String.format( + "CAST((42, ARRAY['1', '2', '3'], MAP['A', TIMESTAMP '2025-06-20 12:00:01', 'B', TIMESTAMP '2025-06-20 12:00:02']) AS %s)", + NonDefaultType.TYPE_STRING), + new NonDefaultType( + 42, + List.of("1", "2", "3"), + Map.of( + "A", + Timestamp.valueOf("2025-06-20 12:00:01"), + "B", + Timestamp.valueOf("2025-06-20 12:00:02"))), + DataTypes.of(NonDefaultType.class).notNull())); } // -------------------------------------------------------------------------------------------- @@ -94,8 +113,8 @@ public class StructuredFunctionsITCase extends BuiltInFunctionTestBase { /** Structured type Type1. */ public static class Type1 { - private static final String TYPE = - "STRUCTURED<'" + Type1.class.getName() + "', a INT, b STRING>"; + private static final String TYPE_STRING = + String.format("STRUCTURED<'%s', a INT, b STRING>", Type1.class.getName()); public Integer a; public String b; @@ -112,8 +131,8 @@ public class StructuredFunctionsITCase extends BuiltInFunctionTestBase { /** Structured type Type2. */ public static class Type2 { - private static final String TYPE = - "STRUCTURED<'" + Type2.class.getName() + "', a INT, b STRING>"; + private static final String TYPE_STRING = + String.format("STRUCTURED<'%s', a INT, b STRING>", Type2.class.getName()); public Integer a; public String b; @@ -130,11 +149,10 @@ public class StructuredFunctionsITCase extends BuiltInFunctionTestBase { /** Structured type NestedType. */ public static class NestedType { - private static final String TYPE = + private static final String TYPE_STRING = String.format( - "STRUCTURED<'" + NestedType.class.getName() + "', n1 %s, n2 %s>", - Type1.TYPE, - Type2.TYPE); + "STRUCTURED<'%s', n1 %s, n2 %s>", + NestedType.class.getName(), Type1.TYPE_STRING, Type2.TYPE_STRING); public Type1 n1; public Type2 n2; @@ -148,4 +166,39 @@ public class StructuredFunctionsITCase extends BuiltInFunctionTestBase { } } } + + /** + * Structured type that does not use default conversion classes but e.g. {@link List} for arrays + * and primitive types. + */ + public static class NonDefaultType { + private static final String TYPE_STRING = + String.format( + "STRUCTURED<'%s', i INT NOT NULL, l ARRAY<STRING>, m MAP<STRING, TIMESTAMP(9)>>", + NonDefaultType.class.getName()); + + public final int i; + public final List<String> l; + public final Map<String, Timestamp> m; + + public NonDefaultType(int i, List<String> l, Map<String, Timestamp> m) { + this.i = i; + this.l = l; + this.m = m; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + final NonDefaultType that = (NonDefaultType) o; + return i == that.i && Objects.equals(l, that.l) && Objects.equals(m, that.m); + } + + @Override + public int hashCode() { + return Objects.hash(i, l, m); + } + } }