This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 75355decc3f [FLINK-38103][table] Fix error for inline structured types 
in DataTypeFactory
75355decc3f is described below

commit 75355decc3fe2684ae4f366cf782a5fcb4bf2b97
Author: Timo Walther <twal...@apache.org>
AuthorDate: Tue Jul 15 10:07:17 2025 +0200

    [FLINK-38103][table] Fix error for inline structured types in 
DataTypeFactory
    
    This closes #26797.
---
 .../flink/table/catalog/DataTypeFactoryImpl.java   | 36 ++++++----------------
 .../types/logical/utils/LogicalTypeDuplicator.java | 25 +++++++--------
 .../table/types/LogicalTypeDuplicatorTest.java     | 29 ++++++++++-------
 .../nodes/exec/serde/LogicalTypeJsonSerdeTest.java |  2 +-
 4 files changed, 41 insertions(+), 51 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DataTypeFactoryImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DataTypeFactoryImpl.java
index 1fce774a81d..86e264cbabb 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DataTypeFactoryImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DataTypeFactoryImpl.java
@@ -34,8 +34,7 @@ import org.apache.flink.table.types.UnresolvedDataType;
 import org.apache.flink.table.types.extraction.DataTypeExtractor;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
-import org.apache.flink.table.types.logical.UnresolvedUserDefinedType;
-import org.apache.flink.table.types.logical.utils.LogicalTypeDuplicator;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
 import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
 import org.apache.flink.table.types.utils.TypeInfoDataTypeConverter;
 
@@ -49,8 +48,6 @@ import static 
org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDa
 @Internal
 final class DataTypeFactoryImpl implements DataTypeFactory {
 
-    private final LogicalTypeResolver resolver = new LogicalTypeResolver();
-
     private final ClassLoader classLoader;
 
     private final Supplier<SerializerConfig> serializerConfig;
@@ -114,15 +111,18 @@ final class DataTypeFactoryImpl implements 
DataTypeFactory {
     @Override
     public LogicalType createLogicalType(String typeString) {
         final LogicalType parsedType = LogicalTypeParser.parse(typeString, 
classLoader);
-        return parsedType.accept(resolver);
+        if (LogicalTypeChecks.hasNested(parsedType, t -> 
t.is(LogicalTypeRoot.UNRESOLVED))) {
+            throw unsupportedUserDefinedTypes();
+        }
+        return parsedType;
     }
 
     @Override
     public LogicalType createLogicalType(UnresolvedIdentifier identifier) {
-        if (!identifier.getDatabaseName().isPresent()) {
+        if (identifier.getDatabaseName().isEmpty()) {
             return createLogicalType(identifier.getObjectName());
         }
-        return resolveType(identifier);
+        throw unsupportedUserDefinedTypes();
     }
 
     // 
--------------------------------------------------------------------------------------------
@@ -146,25 +146,7 @@ final class DataTypeFactoryImpl implements DataTypeFactory 
{
         };
     }
 
-    /** Resolves all {@link UnresolvedUserDefinedType}s. */
-    private class LogicalTypeResolver extends LogicalTypeDuplicator {
-
-        @Override
-        protected LogicalType defaultMethod(LogicalType logicalType) {
-            if (logicalType.is(LogicalTypeRoot.UNRESOLVED)) {
-                final UnresolvedUserDefinedType unresolvedType =
-                        (UnresolvedUserDefinedType) logicalType;
-                return resolveType(unresolvedType.getUnresolvedIdentifier())
-                        .copy(unresolvedType.isNullable());
-            }
-            return logicalType;
-        }
-    }
-
-    private LogicalType resolveType(UnresolvedIdentifier identifier) {
-        assert identifier != null;
-        // TODO validate implementation class of structured types when 
converting from LogicalType
-        //  to DataType
-        throw new TableException("User-defined types are not supported yet.");
+    private TableException unsupportedUserDefinedTypes() {
+        return new TableException("User-defined types are not supported yet.");
     }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeDuplicator.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeDuplicator.java
index 055f7a401eb..e57f67779df 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeDuplicator.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeDuplicator.java
@@ -32,7 +32,6 @@ import org.apache.flink.table.types.logical.StructuredType;
 import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute;
 
 import java.util.List;
-import java.util.Optional;
 import java.util.stream.Collectors;
 
 /**
@@ -94,7 +93,7 @@ public class LogicalTypeDuplicator extends 
LogicalTypeDefaultVisitor<LogicalType
 
     @Override
     public LogicalType visit(StructuredType structuredType) {
-        final StructuredType.Builder builder = 
instantiateStructuredBuilder(structuredType);
+        final StructuredType.Builder builder = 
createStructuredBuilder(structuredType);
         builder.attributes(duplicateStructuredAttributes(structuredType));
         builder.setNullable(structuredType.isNullable());
         builder.setFinal(structuredType.isFinal());
@@ -123,17 +122,19 @@ public class LogicalTypeDuplicator extends 
LogicalTypeDefaultVisitor<LogicalType
 
     // 
--------------------------------------------------------------------------------------------
 
-    private StructuredType.Builder instantiateStructuredBuilder(StructuredType 
structuredType) {
-        final Optional<ObjectIdentifier> identifier = 
structuredType.getObjectIdentifier();
-        final Optional<Class<?>> implementationClass = 
structuredType.getImplementationClass();
-        if (identifier.isPresent() && implementationClass.isPresent()) {
-            return StructuredType.newBuilder(identifier.get(), 
implementationClass.get());
-        } else if (identifier.isPresent()) {
-            return StructuredType.newBuilder(identifier.get());
-        } else if (implementationClass.isPresent()) {
-            return StructuredType.newBuilder(implementationClass.get());
+    private StructuredType.Builder createStructuredBuilder(StructuredType 
structuredType) {
+        final ObjectIdentifier identifier = 
structuredType.getObjectIdentifier().orElse(null);
+        final String className = structuredType.getClassName().orElse(null);
+        final Class<?> implementationClass = 
structuredType.getImplementationClass().orElse(null);
+
+        if (identifier != null && implementationClass != null) {
+            return StructuredType.newBuilder(identifier, implementationClass);
+        } else if (identifier != null) {
+            return StructuredType.newBuilder(identifier);
+        } else if (implementationClass != null) {
+            return StructuredType.newBuilder(implementationClass);
         } else {
-            throw new TableException("Invalid structured type.");
+            return StructuredType.newBuilder(className);
         }
     }
 
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeDuplicatorTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeDuplicatorTest.java
index 7862817d8b9..b68c9a1bdbe 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeDuplicatorTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeDuplicatorTest.java
@@ -29,8 +29,10 @@ import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.MultisetType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
 import org.apache.flink.table.types.logical.SmallIntType;
 import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute;
 import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.table.types.logical.utils.LogicalTypeDuplicator;
 
@@ -39,7 +41,7 @@ import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.Arrays;
-import java.util.Collections;
+import java.util.List;
 import java.util.stream.Stream;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -62,7 +64,8 @@ class LogicalTypeDuplicatorTest {
                 Arguments.of(
                         createDistinctType(new IntType()), 
createDistinctType(new BigIntType())),
                 Arguments.of(createUserType(new IntType()), createUserType(new 
BigIntType())),
-                Arguments.of(createHumanType(), createHumanType()));
+                Arguments.of(createHumanType(), createHumanType()),
+                Arguments.of(createNonClassType(), createNonClassType()));
     }
 
     @ParameterizedTest(name = "{index}: {0}")
@@ -107,17 +110,15 @@ class LogicalTypeDuplicatorTest {
     private static RowType createRowType(LogicalType replacedType) {
         return new RowType(
                 Arrays.asList(
-                        new RowType.RowField("field1", new CharType(2)),
-                        new RowType.RowField("field2", new BooleanType()),
-                        new RowType.RowField("field3", replacedType)));
+                        new RowField("field1", new CharType(2)),
+                        new RowField("field2", new BooleanType()),
+                        new RowField("field3", replacedType)));
     }
 
     private static StructuredType createHumanType() {
         return StructuredType.newBuilder(ObjectIdentifier.of("cat", "db", 
"Human"), Human.class)
                 .attributes(
-                        Collections.singletonList(
-                                new StructuredType.StructuredAttribute(
-                                        "name", new VarCharType(), 
"Description.")))
+                        List.of(new StructuredAttribute("name", new 
VarCharType(), "Description.")))
                 .description("Human type desc.")
                 .setFinal(false)
                 .setInstantiable(false)
@@ -126,9 +127,7 @@ class LogicalTypeDuplicatorTest {
 
     private static StructuredType createUserType(LogicalType replacedType) {
         return StructuredType.newBuilder(ObjectIdentifier.of("cat", "db", 
"User"), User.class)
-                .attributes(
-                        Collections.singletonList(
-                                new 
StructuredType.StructuredAttribute("setting", replacedType)))
+                .attributes(List.of(new StructuredAttribute("setting", 
replacedType)))
                 .description("User type desc.")
                 .setFinal(false)
                 .setInstantiable(true)
@@ -136,10 +135,18 @@ class LogicalTypeDuplicatorTest {
                 .build();
     }
 
+    private static StructuredType createNonClassType() {
+        return StructuredType.newBuilder("NotInClassPathType")
+                .attributes(List.of(new StructuredAttribute("setting", new 
BooleanType())))
+                .build();
+    }
+
+    @SuppressWarnings("unused")
     private abstract static class Human {
         public String name;
     }
 
+    @SuppressWarnings("unused")
     private static final class User extends Human {
         public int setting;
     }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java
index e2ce6e19958..674dd940dcc 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java
@@ -294,7 +294,7 @@ public class LogicalTypeJsonSerdeTest {
                                                         "f2", new 
VarCharType(200), "desc")))
                                 .build(),
                         // inline structured type with class name only
-                        StructuredType.newBuilder(PojoClass.class.getName())
+                        StructuredType.newBuilder("NotInClassPathPojo")
                                 .attributes(
                                         Arrays.asList(
                                                 new StructuredAttribute("f0", 
new IntType(true)),

Reply via email to