This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 03530b6085d [FLINK-38090][table] Align data type of inline structured 
types for TableResult.collect()
03530b6085d is described below

commit 03530b6085deb016a61111dbca9b92825cc6cf7a
Author: Timo Walther <twal...@apache.org>
AuthorDate: Mon Jul 14 12:39:10 2025 +0200

    [FLINK-38090][table] Align data type of inline structured types for 
TableResult.collect()
    
    This closes #26784.
---
 .../flink/table/catalog/DataTypeFactoryImpl.java   |  12 +-
 .../java/org/apache/flink/table/api/DataTypes.java | 186 +++++----
 .../flink/table/catalog/DataTypeFactory.java       |   5 +
 .../types/extraction/BaseMappingExtractor.java     |   2 +-
 .../table/types/extraction/DataTypeExtractor.java  |  33 +-
 .../table/types/extraction/DataTypeTemplate.java   |   4 +-
 .../table/types/inference/TypeTransformations.java |   4 +-
 ...ion.java => ConversionClassTransformation.java} |   6 +-
 .../flink/table/types/logical/StructuredType.java  |   3 +-
 .../flink/table/types/utils/DataTypeUtils.java     | 453 ++++++++++++---------
 .../flink/table/types/utils/DataTypeUtilsTest.java |  38 ++
 .../table/planner/connectors/DynamicSinkUtils.java |  17 +-
 .../nodes/exec/serde/DataTypeJsonSerializer.java   |  27 +-
 .../functions/StructuredFunctionsITCase.java       |  79 +++-
 14 files changed, 547 insertions(+), 322 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DataTypeFactoryImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DataTypeFactoryImpl.java
index b623ea16df2..1fce774a81d 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DataTypeFactoryImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DataTypeFactoryImpl.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.catalog;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.serialization.SerializerConfig;
 import org.apache.flink.api.common.serialization.SerializerConfigImpl;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -64,6 +63,11 @@ final class DataTypeFactoryImpl implements DataTypeFactory {
         this.serializerConfig = createSerializerConfig(classLoader, config, 
serializerConfig);
     }
 
+    @Override
+    public ClassLoader getClassLoader() {
+        return classLoader;
+    }
+
     @Override
     public DataType createDataType(AbstractDataType<?> abstractDataType) {
         if (abstractDataType instanceof DataType) {
@@ -124,9 +128,9 @@ final class DataTypeFactoryImpl implements DataTypeFactory {
     // 
--------------------------------------------------------------------------------------------
 
     /**
-     * Creates a lazy {@link ExecutionConfig} that contains options for {@link 
TypeSerializer}s with
-     * information from existing {@link ExecutionConfig} (if available) 
enriched with table {@link
-     * ReadableConfig}.
+     * Creates a lazy {@link SerializerConfig} that contains options for 
{@link TypeSerializer}s
+     * with information from existing {@link SerializerConfig} (if available) 
enriched with table
+     * {@link ReadableConfig}.
      */
     private static Supplier<SerializerConfig> createSerializerConfig(
             ClassLoader classLoader, ReadableConfig config, SerializerConfig 
serializerConfig) {
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java
index 74683fe61c4..d9b514cd158 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java
@@ -129,7 +129,7 @@ public final class DataTypes {
      * // returns TIMESTAMP(9)
      * of(java.time.LocalDateTime.class)
      *
-     * // returns an anonymous, unregistered structured type
+     * // returns an unregistered (i.e. declared inline) structured type
      * // that is deeply integrated into the API compared to opaque RAW types
      * class User {
      *
@@ -619,16 +619,11 @@ public final class DataTypes {
     }
 
     /**
-     * Unresolved data type of an array of elements with same subtype.
+     * Unresolved {@link #ARRAY(DataType)}.
      *
-     * <p>Compared to the SQL standard, the maximum cardinality of an array 
cannot be specified but
-     * is fixed at {@link Integer#MAX_VALUE}. Also, any valid type is 
supported as a subtype.
-     *
-     * <p>Note: Compared to {@link #ARRAY(DataType)}, this method produces an 
{@link
-     * UnresolvedDataType}. In most of the cases, the {@link 
UnresolvedDataType} will be
-     * automatically resolved by the API. At other locations, a {@link 
DataTypeFactory} is provided.
-     *
-     * @see ArrayType
+     * <p>Compared to {@link #ARRAY(DataType)}, this method produces an {@link 
UnresolvedDataType}.
+     * In most of the cases, the {@link UnresolvedDataType} will be 
automatically resolved by the
+     * API. At other locations, a {@link DataTypeFactory} is provided.
      */
     public static UnresolvedDataType ARRAY(AbstractDataType<?> 
elementDataType) {
         Preconditions.checkNotNull(elementDataType, "Element data type must 
not be null.");
@@ -654,18 +649,11 @@ public final class DataTypes {
     }
 
     /**
-     * Unresolved data type of a multiset (=bag). Unlike a set, it allows for 
multiple instances for
-     * each of its elements with a common subtype. Each unique value 
(including {@code NULL}) is
-     * mapped to some multiplicity.
+     * Unresolved {@link #MULTISET(DataType)}.
      *
-     * <p>There is no restriction of element types; it is the responsibility 
of the user to ensure
-     * uniqueness.
-     *
-     * <p>Note: Compared to {@link #MULTISET(DataType)}, this method produces 
an {@link
+     * <p>Compared to {@link #MULTISET(DataType)}, this method produces an 
{@link
      * UnresolvedDataType}. In most of the cases, the {@link 
UnresolvedDataType} will be
      * automatically resolved by the API. At other locations, a {@link 
DataTypeFactory} is provided.
-     *
-     * @see MultisetType
      */
     public static UnresolvedDataType MULTISET(AbstractDataType<?> 
elementDataType) {
         Preconditions.checkNotNull(elementDataType, "Element data type must 
not be null.");
@@ -694,18 +682,11 @@ public final class DataTypes {
     }
 
     /**
-     * Unresolved data type of an associative array that maps keys (including 
{@code NULL}) to
-     * values (including {@code NULL}). A map cannot contain duplicate keys; 
each key can map to at
-     * most one value.
-     *
-     * <p>There is no restriction of key types; it is the responsibility of 
the user to ensure
-     * uniqueness. The map type is an extension to the SQL standard.
+     * Unresolved {@link #MAP(DataType, DataType)}.
      *
-     * <p>Note: Compared to {@link #MAP(DataType, DataType)}, this method 
produces an {@link
+     * <p>Compared to {@link #MAP(DataType, DataType)}, this method produces 
an {@link
      * UnresolvedDataType}. In most of the cases, the {@link 
UnresolvedDataType} will be
      * automatically resolved by the API. At other locations, a {@link 
DataTypeFactory} is provided.
-     *
-     * @see MapType
      */
     public static UnresolvedDataType MAP(
             AbstractDataType<?> keyDataType, AbstractDataType<?> 
valueDataType) {
@@ -734,6 +715,7 @@ public final class DataTypes {
      * @see RowType
      */
     public static DataType ROW(Field... fields) {
+        checkFields(fields);
         final List<RowField> logicalFields =
                 Stream.of(fields)
                         .map(
@@ -777,27 +759,17 @@ public final class DataTypes {
     }
 
     /**
-     * Unresolved data type of a sequence of fields. A field consists of a 
field name, field type,
-     * and an optional description. The most specific type of a row of a table 
is a row type. In
-     * this case, each column of the row corresponds to the field of the row 
type that has the same
-     * ordinal position as the column.
-     *
-     * <p>Compared to the SQL standard, an optional field description 
simplifies the handling with
-     * complex structures.
+     * Unresolved {@link #ROW(Field...)}.
      *
      * <p>Use {@link #FIELD(String, AbstractDataType)} or {@link 
#FIELD(String, AbstractDataType,
      * String)} to construct fields.
      *
-     * <p>Note: Compared to {@link #ROW(Field...)}, this method produces an 
{@link
-     * UnresolvedDataType} with {@link UnresolvedField}s. In most of the 
cases, the {@link
-     * UnresolvedDataType} will be automatically resolved by the API. At other 
locations, a {@link
-     * DataTypeFactory} is provided.
-     *
-     * @see RowType
+     * <p>Compared to {@link #ROW(Field...)}, this method produces an {@link 
UnresolvedDataType}
+     * with {@link UnresolvedField}s. In most of the cases, the {@link 
UnresolvedDataType} will be
+     * automatically resolved by the API. At other locations, a {@link 
DataTypeFactory} is provided.
      */
     public static UnresolvedDataType ROW(AbstractField... fields) {
-        Stream.of(fields)
-                .forEach(f -> Preconditions.checkNotNull(f, "Field definition 
must not be null."));
+        checkFields(fields);
         return new UnresolvedDataType(
                 () ->
                         String.format(
@@ -821,10 +793,11 @@ public final class DataTypes {
     }
 
     /**
-     * Data type of a sequence of fields.
+     * Unresolved {@link #ROW(DataType...)}.
      *
-     * <p>This is shortcut for {@link #ROW(AbstractField...)} where the field 
names will be
-     * generated using {@code f0, f1, f2, ...}.
+     * <p>Compared to {@link #ROW(DataType...)}, this method produces an 
{@link UnresolvedDataType}
+     * with {@link UnresolvedField}s. In most of the cases, the {@link 
UnresolvedDataType} will be
+     * automatically resolved by the API. At other locations, a {@link 
DataTypeFactory} is provided.
      */
     public static UnresolvedDataType ROW(AbstractDataType<?>... 
fieldDataTypes) {
         return ROW(
@@ -964,50 +937,99 @@ public final class DataTypes {
      * @see DataTypes#of(Class)
      * @see StructuredType
      */
-    public static <T> DataType STRUCTURED(Class<T> implementationClass, 
Field... fields) {
+    public static DataType STRUCTURED(Class<?> implementationClass, Field... 
fields) {
+        checkFields(fields);
         // some basic validation of the class to prevent common mistakes
         validateStructuredClass(implementationClass);
         return 
buildStructuredType(StructuredType.newBuilder(implementationClass), fields);
     }
 
     /**
-     * Data type of a user-defined object structured type. Structured types 
are identified by a
-     * class name and contain zero, one or more attributes. Each attribute has 
a name, a type, and
-     * an optional description. A type cannot be defined in such a way that 
one of its attribute
-     * types (transitively) refers to itself.
+     * Unresolved {@link #STRUCTURED(Class, Field...)}.
      *
-     * <p>Compared to {@link #ROW(Field...)}, which may also be considered a 
"struct-like" type,
-     * structured types are distinguishable even if they contain the same set 
of fields. For
-     * example, "Visit(amount DOUBLE)" is distinct from "Interaction(amount 
DOUBLE)" due its
-     * identifier.
-     *
-     * <p>This method allows for manually constructing an inline structured 
type. This is useful in
-     * cases where the reflective extraction using {@link DataTypes#of(Class)} 
is not applicable.
-     * However, {@link DataTypes#of(Class)} is the recommended approach for 
creating inline
-     * structured types, as it also considers {@link DataTypeHint}s.
+     * <p>Use {@link #FIELD(String, AbstractDataType)} or {@link 
#FIELD(String, AbstractDataType,
+     * String)} to construct fields.
      *
-     * <p>Structured types are internally converted by the system into 
suitable data structures.
-     * Serialization and equality checks (e.g. {@code hashCode/equals}) are 
managed by the system
-     * based on the logical type.
+     * <p>Compared to {@link #STRUCTURED(Class, Field...)}, this method 
produces an {@link
+     * UnresolvedDataType} with {@link UnresolvedField}s. In most of the 
cases, the {@link
+     * UnresolvedDataType} will be automatically resolved by the API. At other 
locations, a {@link
+     * DataTypeFactory} is provided.
+     */
+    public static UnresolvedDataType STRUCTURED(
+            Class<?> implementationClass, AbstractField... fields) {
+        checkFields(fields);
+        return new UnresolvedDataType(
+                () ->
+                        String.format(
+                                StructuredType.INLINE_FORMAT,
+                                implementationClass.getName(),
+                                Stream.of(fields)
+                                        .map(Object::toString)
+                                        .collect(Collectors.joining(", "))),
+                factory -> {
+                    final Field[] fieldsArray =
+                            Stream.of(fields)
+                                    .map(
+                                            f ->
+                                                    new Field(
+                                                            f.name,
+                                                            
factory.createDataType(
+                                                                    
f.getAbstractDataType()),
+                                                            f.description))
+                                    .toArray(Field[]::new);
+                    return STRUCTURED(implementationClass, fieldsArray);
+                });
+    }
+
+    /**
+     * Data type of a user-defined object structured type where the given 
class name is not in the
+     * classpath.
      *
-     * <p>If an optional implementation class is provided, the system will 
convert a structured
-     * object to a JVM object at the edges of the table ecosystem (e.g. when 
bridging to a function
-     * or connector). The implementation class must provide either a 
zero-argument constructor or a
-     * full constructor that assigns all attributes. The class name does not 
need to be resolvable
-     * in the classpath; it may be used solely to distinguish between objects 
with identical
-     * attribute sets. However, in Table API and UDF calls, the system will 
attempt to resolve the
-     * class name to an actual implementation class. If resolution fails, 
{@link Row} is used as a
-     * fallback.
+     * @see #STRUCTURED(Class, Field...) for more information on structured 
types.
+     */
+    public static DataType STRUCTURED(String className, Field... fields) {
+        return buildStructuredType(StructuredType.newBuilder(className), 
fields);
+    }
+
+    /**
+     * Unresolved {@link #STRUCTURED(String, Field...)}.
      *
-     * <p>Note: The caller of this method must ensure that the {@link 
DataType#getConversionClass()}
-     * of each field matches the corresponding attribute in the implementation 
class. Otherwise, a
-     * runtime exception may be thrown.
+     * <p>Use {@link #FIELD(String, AbstractDataType)} or {@link 
#FIELD(String, AbstractDataType,
+     * String)} to construct fields.
      *
-     * @see DataTypes#of(Class)
-     * @see StructuredType
+     * <p>Compared to {@link #STRUCTURED(Class, Field...)}, this method 
produces an {@link
+     * UnresolvedDataType} with {@link UnresolvedField}s. In most of the 
cases, the {@link
+     * UnresolvedDataType} will be automatically resolved by the API. At other 
locations, a {@link
+     * DataTypeFactory} is provided.
      */
-    public static <T> DataType STRUCTURED(String className, Field... fields) {
-        return buildStructuredType(StructuredType.newBuilder(className), 
fields);
+    public static UnresolvedDataType STRUCTURED(String className, 
AbstractField... fields) {
+        checkFields(fields);
+        return new UnresolvedDataType(
+                () ->
+                        String.format(
+                                StructuredType.INLINE_FORMAT,
+                                className,
+                                Stream.of(fields)
+                                        .map(Object::toString)
+                                        .collect(Collectors.joining(", "))),
+                factory -> {
+                    final Field[] fieldsArray =
+                            Stream.of(fields)
+                                    .map(
+                                            f ->
+                                                    new Field(
+                                                            f.name,
+                                                            
factory.createDataType(
+                                                                    
f.getAbstractDataType()),
+                                                            f.description))
+                                    .toArray(Field[]::new);
+                    return STRUCTURED(className, fieldsArray);
+                });
+    }
+
+    private static void checkFields(AbstractField... fields) {
+        Stream.of(fields)
+                .forEach(f -> Preconditions.checkNotNull(f, "Field definition 
must not be null."));
     }
 
     private static DataType buildStructuredType(StructuredType.Builder 
builder, Field... fields) {
@@ -1130,11 +1152,11 @@ public final class DataTypes {
     }
 
     /** Field definition with field name, data type, and a description. */
-    public static Field FIELD(String name, DataType dataType, String 
description) {
+    public static Field FIELD(String name, DataType dataType, @Nullable String 
description) {
         return new Field(
                 Preconditions.checkNotNull(name, "Field name must not be 
null."),
                 Preconditions.checkNotNull(dataType, "Field data type must not 
be null."),
-                Preconditions.checkNotNull(description, "Field description 
must not be null."));
+                description);
     }
 
     /**
@@ -1157,11 +1179,11 @@ public final class DataTypes {
      * UnresolvedField} that can contain an {@link UnresolvedDataType}.
      */
     public static UnresolvedField FIELD(
-            String name, AbstractDataType<?> fieldDataType, String 
description) {
+            String name, AbstractDataType<?> fieldDataType, @Nullable String 
description) {
         return new UnresolvedField(
                 Preconditions.checkNotNull(name, "Field name must not be 
null."),
                 Preconditions.checkNotNull(fieldDataType, "Field data type 
must not be null."),
-                Preconditions.checkNotNull(description, "Field description 
must not be null."));
+                description);
     }
 
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DataTypeFactory.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DataTypeFactory.java
index 5b364531cb5..c71990c393c 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DataTypeFactory.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DataTypeFactory.java
@@ -41,6 +41,11 @@ import 
org.apache.flink.table.types.utils.TypeInfoDataTypeConverter;
 @PublicEvolving
 public interface DataTypeFactory {
 
+    /** Returns the class loader of the current session. */
+    default ClassLoader getClassLoader() {
+        return Thread.currentThread().getContextClassLoader();
+    }
+
     /**
      * Creates a type out of an {@link AbstractDataType}.
      *
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/BaseMappingExtractor.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/BaseMappingExtractor.java
index b8094957f83..c8de7463ed0 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/BaseMappingExtractor.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/BaseMappingExtractor.java
@@ -610,7 +610,7 @@ abstract class BaseMappingExtractor {
                     arg.pos);
         }
         if (argumentHint != null) {
-            final DataTypeTemplate template = 
DataTypeTemplate.fromAnnotation(argumentHint, null);
+            final DataTypeTemplate template = 
DataTypeTemplate.fromAnnotation(argumentHint);
             if (template.inputGroup != null) {
                 return 
Optional.of(FunctionArgumentTemplate.ofInputGroup(template.inputGroup));
             }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java
index ad734649a96..a3cabcc660b 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java
@@ -107,6 +107,10 @@ public final class DataTypeExtractor {
         this.contextExplanation = contextExplanation;
     }
 
+    // 
--------------------------------------------------------------------------------------------
+    // Methods that extract a data type from a JVM Type without any prior 
information
+    // 
--------------------------------------------------------------------------------------------
+
     /** Extracts a data type from a type without considering surrounding 
classes or templates. */
     public static DataType extractFromType(DataTypeFactory typeFactory, Type 
type) {
         return extractDataTypeWithClassContext(
@@ -114,7 +118,7 @@ public final class DataTypeExtractor {
     }
 
     /** Extracts a data type from a type without considering surrounding 
classes but templates. */
-    public static DataType extractFromType(
+    static DataType extractFromType(
             DataTypeFactory typeFactory, DataTypeTemplate template, Type type) 
{
         return extractDataTypeWithClassContext(typeFactory, template, null, 
type, "");
     }
@@ -239,6 +243,31 @@ public final class DataTypeExtractor {
                         method.getName(), baseClass.getName()));
     }
 
+    // 
--------------------------------------------------------------------------------------------
+    // Methods that extract a data type from a JVM Class with prior logical 
information
+    // 
--------------------------------------------------------------------------------------------
+
+    public static DataType extractFromStructuredClass(
+            DataTypeFactory typeFactory, Class<?> implementationClass) {
+        final DataType dataType =
+                extractDataTypeWithClassContext(
+                        typeFactory,
+                        DataTypeTemplate.fromDefaults(),
+                        implementationClass.getEnclosingClass(),
+                        implementationClass,
+                        "");
+        if (!dataType.getLogicalType().is(LogicalTypeRoot.STRUCTURED_TYPE)) {
+            throw extractionError(
+                    "Structured data type expected for class '%s' but was: %s",
+                    implementationClass.getName(), dataType);
+        }
+        return dataType;
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Supporting methods
+    // 
--------------------------------------------------------------------------------------------
+
     private static DataType extractDataTypeWithClassContext(
             DataTypeFactory typeFactory,
             DataTypeTemplate outerTemplate,
@@ -255,8 +284,6 @@ public final class DataTypeExtractor {
         return extractor.extractDataTypeOrRaw(outerTemplate, typeHierarchy, 
type);
     }
 
-    // 
--------------------------------------------------------------------------------------------
-
     private DataType extractDataTypeOrRaw(
             DataTypeTemplate outerTemplate, List<Type> typeHierarchy, Type 
type) {
         // best effort resolution of type variables, the resolved type can 
still be a variable
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeTemplate.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeTemplate.java
index 08426874ec2..785c5aae5f6 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeTemplate.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeTemplate.java
@@ -119,8 +119,8 @@ final class DataTypeTemplate {
      * Creates an instance from the given {@link ArgumentHint} with a resolved 
data type if
      * available.
      */
-    static DataTypeTemplate fromAnnotation(ArgumentHint argumentHint, 
@Nullable DataType dataType) {
-        return fromAnnotation(argumentHint.type(), dataType);
+    static DataTypeTemplate fromAnnotation(ArgumentHint argumentHint) {
+        return fromAnnotation(argumentHint.type(), null);
     }
 
     /**
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeTransformations.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeTransformations.java
index 28afb9a37d3..bdd18459b44 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeTransformations.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeTransformations.java
@@ -20,7 +20,7 @@ package org.apache.flink.table.types.inference;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.types.DataType;
-import 
org.apache.flink.table.types.inference.transforms.DataTypeConversionClassTransformation;
+import 
org.apache.flink.table.types.inference.transforms.ConversionClassTransformation;
 import 
org.apache.flink.table.types.inference.transforms.LegacyRawTypeTransformation;
 import 
org.apache.flink.table.types.inference.transforms.LegacyToNonLegacyTransformation;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
@@ -55,7 +55,7 @@ public final class TypeTransformations {
         conversions.put(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, 
Timestamp.class);
         conversions.put(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE, Time.class);
         conversions.put(LogicalTypeRoot.DATE, Date.class);
-        return new DataTypeConversionClassTransformation(conversions);
+        return new ConversionClassTransformation(conversions);
     }
 
     /**
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/transforms/DataTypeConversionClassTransformation.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/transforms/ConversionClassTransformation.java
similarity index 87%
rename from 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/transforms/DataTypeConversionClassTransformation.java
rename to 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/transforms/ConversionClassTransformation.java
index ff3416d1ce1..7b9b76153ba 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/transforms/DataTypeConversionClassTransformation.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/transforms/ConversionClassTransformation.java
@@ -27,16 +27,16 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import java.util.Map;
 
 /**
- * This type transformation transforms the specified data types to a new one 
with the expected
+ * This type transformation transforms the specified {@link DataType} to a new 
one with the expected
  * conversion class. The mapping from data type to conversion class is defined 
by the constructor
  * parameter {@link #conversions} map that maps from type root to the expected 
conversion class.
  */
 @Internal
-public class DataTypeConversionClassTransformation implements 
TypeTransformation {
+public class ConversionClassTransformation implements TypeTransformation {
 
     private final Map<LogicalTypeRoot, Class<?>> conversions;
 
-    public DataTypeConversionClassTransformation(Map<LogicalTypeRoot, 
Class<?>> conversions) {
+    public ConversionClassTransformation(Map<LogicalTypeRoot, Class<?>> 
conversions) {
         this.conversions = conversions;
     }
 
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/StructuredType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/StructuredType.java
index 2fe7549700e..8d1f271026e 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/StructuredType.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/StructuredType.java
@@ -110,12 +110,11 @@ public final class StructuredType extends UserDefinedType 
{
 
     public static final String CATALOG_FORMAT = "%s";
     public static final String INLINE_FORMAT = "STRUCTURED<'%s', %s>";
+    public static final Class<?> FALLBACK_CONVERSION = Row.class;
 
     private static final Set<String> INPUT_OUTPUT_CONVERSION =
             conversionSet(Row.class.getName(), RowData.class.getName());
 
-    private static final Class<?> FALLBACK_CONVERSION = Row.class;
-
     /** Defines an attribute of a {@link StructuredType}. */
     @PublicEvolving
     public static final class StructuredAttribute implements Serializable {
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
index 1745d4eebd2..6ca310896c1 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
@@ -21,18 +21,19 @@ package org.apache.flink.table.types.utils;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.DataTypes.Field;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.DataTypeFactory;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedSchema;
-import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.types.AtomicDataType;
 import org.apache.flink.table.types.CollectionDataType;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.DataTypeVisitor;
 import org.apache.flink.table.types.FieldsDataType;
 import org.apache.flink.table.types.KeyValueDataType;
+import org.apache.flink.table.types.extraction.DataTypeExtractor;
 import org.apache.flink.table.types.inference.TypeTransformation;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.DistinctType;
@@ -40,6 +41,7 @@ import 
org.apache.flink.table.types.logical.LegacyTypeInformationType;
 import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.MultisetType;
 import org.apache.flink.table.types.logical.RowType;
@@ -49,7 +51,6 @@ import 
org.apache.flink.table.types.logical.StructuredType.StructuredAttribute;
 import org.apache.flink.table.types.logical.TimestampKind;
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
 import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
-import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
@@ -57,6 +58,7 @@ import javax.annotation.Nullable;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -76,47 +78,53 @@ import static 
org.apache.flink.table.types.logical.utils.LogicalTypeUtils.toInte
 @Internal
 public final class DataTypeUtils {
 
+    // 
--------------------------------------------------------------------------------------------
+    // Time attribute utilities
+    // 
--------------------------------------------------------------------------------------------
+
     /**
-     * @deprecated Use the {@link Projection} type
-     * @see Projection#project(DataType)
+     * Removes time attributes from the {@link DataType}. As everywhere else 
in the code base, this
+     * method does not support nested time attributes for now.
      */
-    @Deprecated
-    public static DataType projectRow(DataType dataType, int[][] indexPaths) {
-        return Projection.of(indexPaths).project(dataType);
+    public static DataType removeTimeAttribute(DataType dataType) {
+        final LogicalType type = dataType.getLogicalType();
+        if (type.is(LogicalTypeFamily.TIMESTAMP)) {
+            return replaceLogicalType(dataType, removeTimeAttributes(type));
+        }
+        return dataType;
+    }
+
+    /** Returns a PROCTIME data type. */
+    public static DataType createProctimeDataType() {
+        return new AtomicDataType(new LocalZonedTimestampType(true, 
TimestampKind.PROCTIME, 3));
     }
 
     /**
-     * @deprecated Use the {@link Projection} type
-     * @see Projection#project(DataType)
+     * {@link ResolvedSchema#toPhysicalRowDataType()} erases time attributes. 
This method keeps them
+     * during conversion for very specific use cases mostly in Table API.
      */
-    @Deprecated
-    public static DataType projectRow(DataType dataType, int[] indexPaths) {
-        return Projection.of(indexPaths).project(dataType);
+    public static DataType fromResolvedSchemaPreservingTimeAttributes(
+            ResolvedSchema resolvedSchema) {
+        final List<String> fieldNames = resolvedSchema.getColumnNames();
+        final List<DataType> fieldTypes = resolvedSchema.getColumnDataTypes();
+        return DataTypes.ROW(
+                        IntStream.range(0, fieldNames.size())
+                                .mapToObj(
+                                        pos ->
+                                                DataTypes.FIELD(
+                                                        fieldNames.get(pos), 
fieldTypes.get(pos)))
+                                .collect(Collectors.toList()))
+                .notNull();
     }
 
-    /** Removes a string prefix from the fields of the given row data type. */
-    public static DataType stripRowPrefix(DataType dataType, String prefix) {
-        Preconditions.checkArgument(dataType.getLogicalType().is(ROW), "Row 
data type expected.");
-        final RowType rowType = (RowType) dataType.getLogicalType();
-        final List<String> newFieldNames =
-                rowType.getFieldNames().stream()
-                        .map(
-                                s -> {
-                                    if (s.startsWith(prefix)) {
-                                        return s.substring(prefix.length());
-                                    }
-                                    return s;
-                                })
-                        .collect(Collectors.toList());
-        final LogicalType newRowType = 
LogicalTypeUtils.renameRowFields(rowType, newFieldNames);
-        return new FieldsDataType(
-                newRowType, dataType.getConversionClass(), 
dataType.getChildren());
-    }
+    // 
--------------------------------------------------------------------------------------------
+    // Composite data type utilities
+    // 
--------------------------------------------------------------------------------------------
 
     /** Appends the given list of fields to an existing row data type. */
-    public static DataType appendRowFields(DataType dataType, 
List<DataTypes.Field> fields) {
+    public static DataType appendRowFields(DataType dataType, List<Field> 
fields) {
         Preconditions.checkArgument(dataType.getLogicalType().is(ROW), "Row 
data type expected.");
-        if (fields.size() == 0) {
+        if (fields.isEmpty()) {
             return dataType;
         }
         DataType newRow =
@@ -128,93 +136,6 @@ public final class DataTypeUtils {
         return newRow;
     }
 
-    /**
-     * Creates a {@link DataType} from the given {@link LogicalType} with 
internal data structures.
-     */
-    public static DataType toInternalDataType(LogicalType logicalType) {
-        final DataType defaultDataType = 
TypeConversions.fromLogicalToDataType(logicalType);
-        return toInternalDataType(defaultDataType);
-    }
-
-    /** Creates a {@link DataType} from the given {@link DataType} with 
internal data structures. */
-    public static DataType toInternalDataType(DataType dataType) {
-        return 
dataType.bridgedTo(toInternalConversionClass(dataType.getLogicalType()));
-    }
-
-    /** Checks whether a given data type is an internal data structure. */
-    public static boolean isInternal(DataType dataType) {
-        return isInternal(dataType, true);
-    }
-
-    /** Checks whether a given data type is an internal data structure. */
-    public static boolean isInternal(DataType dataType, boolean autobox) {
-        final Class<?> clazz;
-        if (autobox) {
-            clazz = primitiveToWrapper(dataType.getConversionClass());
-        } else {
-            clazz = dataType.getConversionClass();
-        }
-
-        return clazz == toInternalConversionClass(dataType.getLogicalType());
-    }
-
-    /**
-     * Replaces the {@link LogicalType} of a {@link DataType}, i.e., it keeps 
the bridging class.
-     */
-    public static DataType replaceLogicalType(DataType dataType, LogicalType 
replacement) {
-        return LogicalTypeDataTypeConverter.toDataType(replacement)
-                .bridgedTo(dataType.getConversionClass());
-    }
-
-    /**
-     * Removes time attributes from the {@link DataType}. As everywhere else 
in the code base, this
-     * method does not support nested time attributes for now.
-     */
-    public static DataType removeTimeAttribute(DataType dataType) {
-        final LogicalType type = dataType.getLogicalType();
-        if (type.is(LogicalTypeFamily.TIMESTAMP)) {
-            return replaceLogicalType(dataType, removeTimeAttributes(type));
-        }
-        return dataType;
-    }
-
-    /**
-     * Transforms the given data type to a different data type using the given 
transformations.
-     *
-     * @see #transform(DataTypeFactory, DataType, TypeTransformation...)
-     */
-    public static DataType transform(
-            DataType typeToTransform, TypeTransformation... transformations) {
-        return transform(null, typeToTransform, transformations);
-    }
-
-    /**
-     * Transforms the given data type to a different data type using the given 
transformations.
-     *
-     * <p>The transformations will be called in the given order. In case of 
constructed or composite
-     * types, a transformation will be applied transitively to children first.
-     *
-     * <p>Both the {@link DataType#getLogicalType()} and {@link 
DataType#getConversionClass()} can
-     * be transformed.
-     *
-     * @param factory {@link DataTypeFactory} if available
-     * @param typeToTransform data type to be transformed.
-     * @param transformations the transformations to transform data type to 
another type.
-     * @return the new data type
-     */
-    public static DataType transform(
-            @Nullable DataTypeFactory factory,
-            DataType typeToTransform,
-            TypeTransformation... transformations) {
-        Preconditions.checkArgument(
-                transformations.length > 0, "transformations should not be 
empty.");
-        DataType newType = typeToTransform;
-        for (TypeTransformation transformation : transformations) {
-            newType = newType.accept(new DataTypeTransformer(factory, 
transformation));
-        }
-        return newType;
-    }
-
     /**
      * Expands a composite {@link DataType} to a corresponding {@link 
ResolvedSchema}. Useful for
      * flattening a column or mapping a physical to logical type of a table 
source
@@ -241,6 +162,56 @@ public final class DataTypeUtils {
         throw new IllegalArgumentException("Expected a composite type");
     }
 
+    private static ResolvedSchema expandCompositeType(FieldsDataType dataType) 
{
+        DataType[] fieldDataTypes = dataType.getChildren().toArray(new 
DataType[0]);
+        return dataType.getLogicalType()
+                .accept(
+                        new LogicalTypeDefaultVisitor<>() {
+                            @Override
+                            public ResolvedSchema visit(RowType rowType) {
+                                return expandCompositeType(rowType, 
fieldDataTypes);
+                            }
+
+                            @Override
+                            public ResolvedSchema visit(StructuredType 
structuredType) {
+                                return expandCompositeType(structuredType, 
fieldDataTypes);
+                            }
+
+                            @Override
+                            public ResolvedSchema visit(DistinctType 
distinctType) {
+                                return 
distinctType.getSourceType().accept(this);
+                            }
+
+                            @Override
+                            protected ResolvedSchema defaultMethod(LogicalType 
logicalType) {
+                                throw new IllegalArgumentException("Expected a 
composite type");
+                            }
+                        });
+    }
+
+    private static ResolvedSchema expandLegacyCompositeType(DataType dataType) 
{
+        // legacy composite type
+        CompositeType<?> compositeType =
+                (CompositeType<?>)
+                        ((LegacyTypeInformationType<?>) 
dataType.getLogicalType())
+                                .getTypeInformation();
+
+        String[] fieldNames = compositeType.getFieldNames();
+        DataType[] fieldTypes =
+                Arrays.stream(fieldNames)
+                        .map(compositeType::getTypeAt)
+                        .map(TypeConversions::fromLegacyInfoToDataType)
+                        .toArray(DataType[]::new);
+
+        return ResolvedSchema.physical(fieldNames, fieldTypes);
+    }
+
+    private static ResolvedSchema expandCompositeType(
+            LogicalType compositeType, DataType[] fieldDataTypes) {
+        final String[] fieldNames = getFieldNames(compositeType).toArray(new 
String[0]);
+        return ResolvedSchema.physical(fieldNames, fieldDataTypes);
+    }
+
     /**
      * Retrieves a nested field from a composite type at given position.
      *
@@ -306,50 +277,89 @@ public final class DataTypeUtils {
         }
     }
 
+    // 
--------------------------------------------------------------------------------------------
+    // Logical type utilities
+    // 
--------------------------------------------------------------------------------------------
+
     /**
-     * The {@link DataType} class can only partially verify the conversion 
class. This method can
-     * perform the final check when we know if the data type should be used 
for input.
+     * Replaces the {@link LogicalType} of a {@link DataType}, i.e., it keeps 
the bridging class.
      */
-    public static void validateInputDataType(DataType dataType) {
-        dataType.accept(DataTypeInputClassValidator.INSTANCE);
+    public static DataType replaceLogicalType(DataType dataType, LogicalType 
replacement) {
+        return LogicalTypeDataTypeConverter.toDataType(replacement)
+                .bridgedTo(dataType.getConversionClass());
     }
 
+    // 
--------------------------------------------------------------------------------------------
+    // Conversion class utilities
+    // 
--------------------------------------------------------------------------------------------
+
     /**
-     * The {@link DataType} class can only partially verify the conversion 
class. This method can
-     * perform the final check when we know if the data type should be used 
for output.
+     * Creates a {@link DataType} from the given {@link LogicalType} with 
internal data structures.
      */
-    public static void validateOutputDataType(DataType dataType) {
-        dataType.accept(DataTypeOutputClassValidator.INSTANCE);
+    public static DataType toInternalDataType(LogicalType logicalType) {
+        final DataType defaultDataType = 
TypeConversions.fromLogicalToDataType(logicalType);
+        return toInternalDataType(defaultDataType);
     }
 
-    /** Returns a PROCTIME data type. */
-    public static DataType createProctimeDataType() {
-        return new AtomicDataType(new LocalZonedTimestampType(true, 
TimestampKind.PROCTIME, 3));
+    /** Creates a {@link DataType} from the given {@link DataType} with 
internal data structures. */
+    public static DataType toInternalDataType(DataType dataType) {
+        return 
dataType.bridgedTo(toInternalConversionClass(dataType.getLogicalType()));
+    }
+
+    /** Checks whether a given data type is an internal data structure. */
+    public static boolean isInternal(DataType dataType) {
+        return isInternal(dataType, true);
+    }
+
+    /** Checks whether a given data type is an internal data structure. */
+    public static boolean isInternal(DataType dataType, boolean autobox) {
+        final Class<?> clazz;
+        if (autobox) {
+            clazz = primitiveToWrapper(dataType.getConversionClass());
+        } else {
+            clazz = dataType.getConversionClass();
+        }
+
+        return clazz == toInternalConversionClass(dataType.getLogicalType());
     }
 
     /**
-     * {@link ResolvedSchema#toPhysicalRowDataType()} erases time attributes. 
This method keeps them
-     * during conversion for very specific use cases mostly in Table API.
+     * Checks whether this data type and its children use the {@link
+     * LogicalType#getDefaultConversion()} defined by the logical type.
      */
-    public static DataType fromResolvedSchemaPreservingTimeAttributes(
-            ResolvedSchema resolvedSchema) {
-        final List<String> fieldNames = resolvedSchema.getColumnNames();
-        final List<DataType> fieldTypes = resolvedSchema.getColumnDataTypes();
-        return DataTypes.ROW(
-                        IntStream.range(0, fieldNames.size())
-                                .mapToObj(
-                                        pos ->
-                                                DataTypes.FIELD(
-                                                        fieldNames.get(pos), 
fieldTypes.get(pos)))
-                                .collect(Collectors.toList()))
-                .notNull();
+    public static boolean isDefaultClassNested(DataType dataType) {
+        return isDefaultClass(dataType)
+                && 
dataType.getChildren().stream().allMatch(DataTypeUtils::isDefaultClassNested);
     }
 
-    private DataTypeUtils() {
-        // no instantiation
+    /**
+     * Checks whether this data type uses the {@link 
LogicalType#getDefaultConversion()} defined by
+     * the logical type.
+     */
+    public static boolean isDefaultClass(DataType dataType) {
+        return Objects.equals(
+                dataType.getConversionClass(), 
dataType.getLogicalType().getDefaultConversion());
     }
 
-    // 
------------------------------------------------------------------------------------------
+    // 
--------------------------------------------------------------------------------------------
+    // Data type validation
+    // 
--------------------------------------------------------------------------------------------
+
+    /**
+     * The {@link DataType} class can only partially verify the conversion 
class. This method can
+     * perform the final check when we know if the data type should be used 
for input.
+     */
+    public static void validateInputDataType(DataType dataType) {
+        dataType.accept(DataTypeInputClassValidator.INSTANCE);
+    }
+
+    /**
+     * The {@link DataType} class can only partially verify the conversion 
class. This method can
+     * perform the final check when we know if the data type should be used 
for output.
+     */
+    public static void validateOutputDataType(DataType dataType) {
+        dataType.accept(DataTypeOutputClassValidator.INSTANCE);
+    }
 
     private static class DataTypeInputClassValidator extends 
DataTypeDefaultVisitor<Void> {
 
@@ -388,6 +398,47 @@ public final class DataTypeUtils {
         }
     }
 
+    // 
--------------------------------------------------------------------------------------------
+    // Data type transformations
+    // 
--------------------------------------------------------------------------------------------
+
+    /**
+     * Transforms the given data type to a different data type using the given 
transformations.
+     *
+     * @see #transform(DataTypeFactory, DataType, TypeTransformation...)
+     */
+    public static DataType transform(
+            DataType typeToTransform, TypeTransformation... transformations) {
+        return transform(null, typeToTransform, transformations);
+    }
+
+    /**
+     * Transforms the given data type to a different data type using the given 
transformations.
+     *
+     * <p>The transformations will be called in the given order. In case of 
constructed or composite
+     * types, a transformation will be applied transitively to children first.
+     *
+     * <p>Both the {@link DataType#getLogicalType()} and {@link 
DataType#getConversionClass()} can
+     * be transformed.
+     *
+     * @param factory {@link DataTypeFactory} if available
+     * @param typeToTransform data type to be transformed.
+     * @param transformations the transformations to transform data type to 
another type.
+     * @return the new data type
+     */
+    public static DataType transform(
+            @Nullable DataTypeFactory factory,
+            DataType typeToTransform,
+            TypeTransformation... transformations) {
+        Preconditions.checkArgument(
+                transformations.length > 0, "transformations should not be 
empty.");
+        DataType newType = typeToTransform;
+        for (TypeTransformation transformation : transformations) {
+            newType = newType.accept(new DataTypeTransformer(factory, 
transformation));
+        }
+        return newType;
+    }
+
     /**
      * Transforms a {@link DataType}.
      *
@@ -543,53 +594,81 @@ public final class DataTypeUtils {
         }
     }
 
-    private static ResolvedSchema expandCompositeType(FieldsDataType dataType) 
{
-        DataType[] fieldDataTypes = dataType.getChildren().toArray(new 
DataType[0]);
-        return dataType.getLogicalType()
-                .accept(
-                        new LogicalTypeDefaultVisitor<ResolvedSchema>() {
-                            @Override
-                            public ResolvedSchema visit(RowType rowType) {
-                                return expandCompositeType(rowType, 
fieldDataTypes);
-                            }
+    // 
--------------------------------------------------------------------------------------------
+    // Structured type alignment
+    // 
--------------------------------------------------------------------------------------------
 
-                            @Override
-                            public ResolvedSchema visit(StructuredType 
structuredType) {
-                                return expandCompositeType(structuredType, 
fieldDataTypes);
-                            }
+    /**
+     * Aligns the {@link DataType} and its nested conversion classes with the 
given {@link
+     * StructuredType#getImplementationClass()}.
+     *
+     * <p>By default, a data type is created from a {@link LogicalType} and 
uses default conversion
+     * classes. But for conversion to the implementation class, the data type 
must reflect the
+     * correct expected classes (e.g. {@code List<Integer>} instead of {@code 
Integer[]}) for all
+     * attributes.
+     */
+    public static DataType alignStructuredTypes(DataTypeFactory factory, 
DataType dataType) {
+        return dataType.accept(new StructuredTypeAligner(factory));
+    }
 
-                            @Override
-                            public ResolvedSchema visit(DistinctType 
distinctType) {
-                                return 
distinctType.getSourceType().accept(this);
-                            }
+    private static class StructuredTypeAligner implements 
DataTypeVisitor<DataType> {
 
-                            @Override
-                            protected ResolvedSchema defaultMethod(LogicalType 
logicalType) {
-                                throw new IllegalArgumentException("Expected a 
composite type");
-                            }
-                        });
-    }
+        private final DataTypeFactory factory;
 
-    private static ResolvedSchema expandLegacyCompositeType(DataType dataType) 
{
-        // legacy composite type
-        CompositeType<?> compositeType =
-                (CompositeType<?>)
-                        ((LegacyTypeInformationType<?>) 
dataType.getLogicalType())
-                                .getTypeInformation();
+        public StructuredTypeAligner(DataTypeFactory factory) {
+            this.factory = factory;
+        }
 
-        String[] fieldNames = compositeType.getFieldNames();
-        DataType[] fieldTypes =
-                Arrays.stream(fieldNames)
-                        .map(compositeType::getTypeAt)
-                        .map(TypeConversions::fromLegacyInfoToDataType)
-                        .toArray(DataType[]::new);
+        @Override
+        public DataType visit(AtomicDataType atomicDataType) {
+            return atomicDataType;
+        }
 
-        return ResolvedSchema.physical(fieldNames, fieldTypes);
+        @Override
+        public DataType visit(CollectionDataType collectionDataType) {
+            return new CollectionDataType(
+                    collectionDataType.getLogicalType(),
+                    collectionDataType.getConversionClass(),
+                    collectionDataType.getElementDataType().accept(this));
+        }
+
+        @Override
+        public DataType visit(FieldsDataType fieldsDataType) {
+            final LogicalType logicalType = fieldsDataType.getLogicalType();
+            if (logicalType.is(LogicalTypeRoot.STRUCTURED_TYPE)
+                    && logicalType.getDefaultConversion() != 
StructuredType.FALLBACK_CONVERSION) {
+                return alignDataType(fieldsDataType, (StructuredType) 
logicalType);
+            }
+            return new FieldsDataType(
+                    fieldsDataType.getLogicalType(),
+                    fieldsDataType.getConversionClass(),
+                    fieldsDataType.getChildren().stream()
+                            .map(f -> f.accept(this))
+                            .collect(Collectors.toList()));
+        }
+
+        @Override
+        public DataType visit(KeyValueDataType keyValueDataType) {
+            return new KeyValueDataType(
+                    keyValueDataType.getLogicalType(),
+                    keyValueDataType.getConversionClass(),
+                    keyValueDataType.getKeyDataType().accept(this),
+                    keyValueDataType.getValueDataType().accept(this));
+        }
+
+        private DataType alignDataType(DataType defaultDataType, 
StructuredType structuredType) {
+            final Class<?> implementationClass =
+                    
structuredType.getImplementationClass().orElseThrow(IllegalStateException::new);
+            try {
+                return DataTypeExtractor.extractFromStructuredClass(factory, 
implementationClass);
+            } catch (ValidationException e) {
+                // Necessary for legacy code paths and tests written in Scala
+                return defaultDataType;
+            }
+        }
     }
 
-    private static ResolvedSchema expandCompositeType(
-            LogicalType compositeType, DataType[] fieldDataTypes) {
-        final String[] fieldNames = getFieldNames(compositeType).toArray(new 
String[0]);
-        return ResolvedSchema.physical(fieldNames, fieldDataTypes);
+    private DataTypeUtils() {
+        // no instantiation
     }
 }
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/utils/DataTypeUtilsTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/utils/DataTypeUtilsTest.java
index 430111543bc..f4ad686cd98 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/utils/DataTypeUtilsTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/utils/DataTypeUtilsTest.java
@@ -19,7 +19,9 @@
 package org.apache.flink.table.types.utils;
 
 import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.table.annotation.DataTypeHint;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.Column;
@@ -234,4 +236,40 @@ class DataTypeUtilsTest {
                                 + java.util.HashMap.class.getName()
                                 + "'.");
     }
+
+    @Test
+    void testStructuredTypeAlignment() {
+        final DataTypeFactoryMock factoryMock = new DataTypeFactoryMock();
+
+        final DataType expectedDataType = 
DataTypes.of(NestedPojo.class).toDataType(factoryMock);
+
+        // Erases conversion classes and uses default conversion classes
+        final DataType defaultDataType = 
DataTypes.of(expectedDataType.getLogicalType());
+
+        final DataType dataType =
+                DataTypeUtils.alignStructuredTypes(
+                        factoryMock, DataTypes.ROW(DataTypes.INT(), 
defaultDataType));
+        assertThat(dataType).isEqualTo(DataTypes.ROW(DataTypes.INT(), 
expectedDataType));
+    }
+
+    /** POJO that does not use default conversions for certain data types. */
+    public static class Pojo {
+        // Primitive instead of boxed class
+        public int i;
+
+        // List instead of array and java.sql.Timestamp instead of 
java.time.LocalDateTime
+        public List<Timestamp> timestamps;
+
+        // byte[] as annotated STRING data type
+        @DataTypeHint("STRING")
+        public byte[] string;
+    }
+
+    /** Nested POJO that does not use default conversions for certain data 
types. */
+    public static class NestedPojo {
+
+        public Tuple2<Boolean, Double> tuple;
+
+        public List<Pojo> pojoList;
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
index e5bcfa075ce..411470f5dbc 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
@@ -43,6 +43,7 @@ import org.apache.flink.table.catalog.TableDistribution;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.connector.RowLevelModificationScanContext;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
+import 
org.apache.flink.table.connector.sink.DynamicTableSink.DataStructureConverter;
 import org.apache.flink.table.connector.sink.abilities.SupportsBucketing;
 import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
 import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
@@ -139,7 +140,7 @@ public final class DynamicSinkUtils {
         final ContextResolvedTable contextResolvedTable =
                 ContextResolvedTable.anonymous("collect", catalogTable);
 
-        final DataType consumedDataType = fixCollectDataType(dataTypeFactory, 
schema);
+        final DataType consumedDataType = 
deriveCollectDataType(dataTypeFactory, schema);
 
         final String zone = 
configuration.get(TableConfigOptions.LOCAL_TIME_ZONE);
         final ZoneId zoneId =
@@ -897,17 +898,23 @@ public final class DynamicSinkUtils {
 
     // 
--------------------------------------------------------------------------------------------
 
-    /** Temporary solution until we drop legacy types. */
-    private static DataType fixCollectDataType(
+    /**
+     * Prepares a {@link DataType} for {@link DataStructureConverter} from 
{@link ResolvedSchema}.
+     */
+    private static DataType deriveCollectDataType(
             DataTypeFactory dataTypeFactory, ResolvedSchema schema) {
+        // TODO erase the conversion class earlier when dropping legacy code, 
esp. FLINK-22321
         final DataType fixedDataType =
                 DataTypeUtils.transform(
                         dataTypeFactory,
                         schema.toSourceRowDataType(),
                         TypeTransformations.legacyRawToTypeInfoRaw(),
                         TypeTransformations.legacyToNonLegacy());
-        // TODO erase the conversion class earlier when dropping legacy code, 
esp. FLINK-22321
-        return 
TypeConversions.fromLogicalToDataType(fixedDataType.getLogicalType());
+        final DataType defaultDataType =
+                
TypeConversions.fromLogicalToDataType(fixedDataType.getLogicalType());
+
+        // Structured types might not use default conversion classes.
+        return DataTypeUtils.alignStructuredTypes(dataTypeFactory, 
defaultDataType);
     }
 
     /**
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonSerializer.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonSerializer.java
index 8593493bac3..28009f1a236 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonSerializer.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonSerializer.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.api.DataTypes.Field;
 import org.apache.flink.table.types.CollectionDataType;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.KeyValueDataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
@@ -30,7 +31,6 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.S
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Objects;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.table.types.utils.DataTypeUtils.isInternal;
@@ -96,7 +96,6 @@ final class DataTypeJsonSerializer extends 
StdSerializer<DataType> {
     // ROW, STRUCTURED_TYPE, DISTINCT_TYPE
     static final String FIELD_NAME_FIELDS = "fields";
     static final String FIELD_NAME_FIELD_NAME = "name";
-    static final String FIELD_NAME_FIELD_CLASS = "fieldClass";
 
     DataTypeJsonSerializer() {
         super(DataType.class);
@@ -106,7 +105,7 @@ final class DataTypeJsonSerializer extends 
StdSerializer<DataType> {
     public void serialize(
             DataType dataType, JsonGenerator jsonGenerator, SerializerProvider 
serializerProvider)
             throws IOException {
-        if (isDefaultClassNested(dataType)) {
+        if (DataTypeUtils.isDefaultClassNested(dataType)) {
             
serializerProvider.defaultSerializeValue(dataType.getLogicalType(), 
jsonGenerator);
         } else {
             jsonGenerator.writeStartObject();
@@ -120,7 +119,7 @@ final class DataTypeJsonSerializer extends 
StdSerializer<DataType> {
     private static void serializeClass(DataType dataType, JsonGenerator 
jsonGenerator)
             throws IOException {
         // skip the conversion class if only nested types contain custom 
conversion classes
-        if (!isDefaultClass(dataType)) {
+        if (!DataTypeUtils.isDefaultClass(dataType)) {
             jsonGenerator.writeStringField(
                     FIELD_NAME_CONVERSION_CLASS, 
dataType.getConversionClass().getName());
         }
@@ -149,7 +148,10 @@ final class DataTypeJsonSerializer extends 
StdSerializer<DataType> {
             case STRUCTURED_TYPE:
                 final List<Field> nonDefaultFields =
                         DataType.getFields(dataType).stream()
-                                .filter(field -> 
!isDefaultClassNested(field.getDataType()))
+                                .filter(
+                                        field ->
+                                                
!DataTypeUtils.isDefaultClassNested(
+                                                        field.getDataType()))
                                 .collect(Collectors.toList());
                 if (nonDefaultFields.isEmpty()) {
                     break;
@@ -167,7 +169,7 @@ final class DataTypeJsonSerializer extends 
StdSerializer<DataType> {
                 break;
             case DISTINCT_TYPE:
                 final DataType sourceDataType = dataType.getChildren().get(0);
-                if (!isDefaultClassNested(sourceDataType)) {
+                if (!DataTypeUtils.isDefaultClassNested(sourceDataType)) {
                     serializeClass(sourceDataType, jsonGenerator);
                 }
                 break;
@@ -178,22 +180,11 @@ final class DataTypeJsonSerializer extends 
StdSerializer<DataType> {
 
     private static void serializeFieldIfNotDefaultClass(
             DataType dataType, String fieldName, JsonGenerator jsonGenerator) 
throws IOException {
-        if (!isDefaultClassNested(dataType)) {
+        if (!DataTypeUtils.isDefaultClassNested(dataType)) {
             jsonGenerator.writeFieldName(fieldName);
             jsonGenerator.writeStartObject();
             serializeClass(dataType, jsonGenerator);
             jsonGenerator.writeEndObject();
         }
     }
-
-    private static boolean isDefaultClassNested(DataType dataType) {
-        return isDefaultClass(dataType)
-                && dataType.getChildren().stream()
-                        
.allMatch(DataTypeJsonSerializer::isDefaultClassNested);
-    }
-
-    private static boolean isDefaultClass(DataType dataType) {
-        return Objects.equals(
-                dataType.getConversionClass(), 
dataType.getLogicalType().getDefaultConversion());
-    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/StructuredFunctionsITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/StructuredFunctionsITCase.java
index 61bfcd01eb2..d7d6558143c 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/StructuredFunctionsITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/StructuredFunctionsITCase.java
@@ -23,6 +23,10 @@ import 
org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.types.logical.StructuredType;
 
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 import java.util.stream.Stream;
 
 /** Tests for functions dealing with {@link StructuredType}. */
@@ -45,7 +49,7 @@ public class StructuredFunctionsITCase extends 
BuiltInFunctionTestBase {
                         // Same value from CAST
                         .testSqlResult(
                                 "Type1Constructor(f0, f1) = CAST((14, 'Bob') 
AS "
-                                        + Type1.TYPE
+                                        + Type1.TYPE_STRING
                                         + ")",
                                 true,
                                 DataTypes.BOOLEAN())
@@ -57,7 +61,7 @@ public class StructuredFunctionsITCase extends 
BuiltInFunctionTestBase {
                         // Different value from CAST
                         .testSqlResult(
                                 "Type1Constructor(f0, f1) = CAST((15, 'Alice') 
AS "
-                                        + Type1.TYPE
+                                        + Type1.TYPE_STRING
                                         + ")",
                                 false,
                                 DataTypes.BOOLEAN())
@@ -68,7 +72,7 @@ public class StructuredFunctionsITCase extends 
BuiltInFunctionTestBase {
                         // Different class name from CAST
                         .testSqlValidationError(
                                 "Type1Constructor(f0, f1) = CAST((14, 'Bob') 
AS "
-                                        + Type2.TYPE
+                                        + Type2.TYPE_STRING
                                         + ")",
                                 "Incompatible structured types")
                         // Same class name but different fields
@@ -83,9 +87,24 @@ public class StructuredFunctionsITCase extends 
BuiltInFunctionTestBase {
                                         
"NestedConstructor(Type1Constructor(f0, f1), Type2Constructor(15, 'Alice')) = 
CAST("
                                                 + "(CAST((14, 'Bob') AS %s), 
CAST((15, 'Alice') AS %s))"
                                                 + " AS %s)",
-                                        Type1.TYPE, Type2.TYPE, 
NestedType.TYPE),
+                                        Type1.TYPE_STRING,
+                                        Type2.TYPE_STRING,
+                                        NestedType.TYPE_STRING),
                                 true,
-                                DataTypes.BOOLEAN()));
+                                DataTypes.BOOLEAN())
+                        .testSqlResult(
+                                String.format(
+                                        "CAST((42, ARRAY['1', '2', '3'], 
MAP['A', TIMESTAMP '2025-06-20 12:00:01', 'B', TIMESTAMP '2025-06-20 
12:00:02']) AS %s)",
+                                        NonDefaultType.TYPE_STRING),
+                                new NonDefaultType(
+                                        42,
+                                        List.of("1", "2", "3"),
+                                        Map.of(
+                                                "A",
+                                                Timestamp.valueOf("2025-06-20 
12:00:01"),
+                                                "B",
+                                                Timestamp.valueOf("2025-06-20 
12:00:02"))),
+                                DataTypes.of(NonDefaultType.class).notNull()));
     }
 
     // 
--------------------------------------------------------------------------------------------
@@ -94,8 +113,8 @@ public class StructuredFunctionsITCase extends 
BuiltInFunctionTestBase {
 
     /** Structured type Type1. */
     public static class Type1 {
-        private static final String TYPE =
-                "STRUCTURED<'" + Type1.class.getName() + "', a INT, b STRING>";
+        private static final String TYPE_STRING =
+                String.format("STRUCTURED<'%s', a INT, b STRING>", 
Type1.class.getName());
 
         public Integer a;
         public String b;
@@ -112,8 +131,8 @@ public class StructuredFunctionsITCase extends 
BuiltInFunctionTestBase {
 
     /** Structured type Type2. */
     public static class Type2 {
-        private static final String TYPE =
-                "STRUCTURED<'" + Type2.class.getName() + "', a INT, b STRING>";
+        private static final String TYPE_STRING =
+                String.format("STRUCTURED<'%s', a INT, b STRING>", 
Type2.class.getName());
 
         public Integer a;
         public String b;
@@ -130,11 +149,10 @@ public class StructuredFunctionsITCase extends 
BuiltInFunctionTestBase {
 
     /** Structured type NestedType. */
     public static class NestedType {
-        private static final String TYPE =
+        private static final String TYPE_STRING =
                 String.format(
-                        "STRUCTURED<'" + NestedType.class.getName() + "', n1 
%s, n2 %s>",
-                        Type1.TYPE,
-                        Type2.TYPE);
+                        "STRUCTURED<'%s', n1 %s, n2 %s>",
+                        NestedType.class.getName(), Type1.TYPE_STRING, 
Type2.TYPE_STRING);
 
         public Type1 n1;
         public Type2 n2;
@@ -148,4 +166,39 @@ public class StructuredFunctionsITCase extends 
BuiltInFunctionTestBase {
             }
         }
     }
+
+    /**
+     * Structured type that does not use default conversion classes but e.g. 
{@link List} for arrays
+     * and primitive types.
+     */
+    public static class NonDefaultType {
+        private static final String TYPE_STRING =
+                String.format(
+                        "STRUCTURED<'%s', i INT NOT NULL, l ARRAY<STRING>, m 
MAP<STRING, TIMESTAMP(9)>>",
+                        NonDefaultType.class.getName());
+
+        public final int i;
+        public final List<String> l;
+        public final Map<String, Timestamp> m;
+
+        public NonDefaultType(int i, List<String> l, Map<String, Timestamp> m) 
{
+            this.i = i;
+            this.l = l;
+            this.m = m;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            final NonDefaultType that = (NonDefaultType) o;
+            return i == that.i && Objects.equals(l, that.l) && 
Objects.equals(m, that.m);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(i, l, m);
+        }
+    }
 }

Reply via email to