This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 75355decc3f [FLINK-38103][table] Fix error for inline structured types in DataTypeFactory 75355decc3f is described below commit 75355decc3fe2684ae4f366cf782a5fcb4bf2b97 Author: Timo Walther <twal...@apache.org> AuthorDate: Tue Jul 15 10:07:17 2025 +0200 [FLINK-38103][table] Fix error for inline structured types in DataTypeFactory This closes #26797. --- .../flink/table/catalog/DataTypeFactoryImpl.java | 36 ++++++---------------- .../types/logical/utils/LogicalTypeDuplicator.java | 25 +++++++-------- .../table/types/LogicalTypeDuplicatorTest.java | 29 ++++++++++------- .../nodes/exec/serde/LogicalTypeJsonSerdeTest.java | 2 +- 4 files changed, 41 insertions(+), 51 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DataTypeFactoryImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DataTypeFactoryImpl.java index 1fce774a81d..86e264cbabb 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DataTypeFactoryImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DataTypeFactoryImpl.java @@ -34,8 +34,7 @@ import org.apache.flink.table.types.UnresolvedDataType; import org.apache.flink.table.types.extraction.DataTypeExtractor; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalTypeRoot; -import org.apache.flink.table.types.logical.UnresolvedUserDefinedType; -import org.apache.flink.table.types.logical.utils.LogicalTypeDuplicator; +import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; import org.apache.flink.table.types.logical.utils.LogicalTypeParser; import org.apache.flink.table.types.utils.TypeInfoDataTypeConverter; @@ -49,8 +48,6 @@ import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDa @Internal final class DataTypeFactoryImpl implements DataTypeFactory { - private final LogicalTypeResolver resolver = new LogicalTypeResolver(); - private final ClassLoader classLoader; private final Supplier<SerializerConfig> serializerConfig; @@ -114,15 +111,18 @@ final class DataTypeFactoryImpl implements DataTypeFactory { @Override public LogicalType createLogicalType(String typeString) { final LogicalType parsedType = LogicalTypeParser.parse(typeString, classLoader); - return parsedType.accept(resolver); + if (LogicalTypeChecks.hasNested(parsedType, t -> t.is(LogicalTypeRoot.UNRESOLVED))) { + throw unsupportedUserDefinedTypes(); + } + return parsedType; } @Override public LogicalType createLogicalType(UnresolvedIdentifier identifier) { - if (!identifier.getDatabaseName().isPresent()) { + if (identifier.getDatabaseName().isEmpty()) { return createLogicalType(identifier.getObjectName()); } - return resolveType(identifier); + throw unsupportedUserDefinedTypes(); } // -------------------------------------------------------------------------------------------- @@ -146,25 +146,7 @@ final class DataTypeFactoryImpl implements DataTypeFactory { }; } - /** Resolves all {@link UnresolvedUserDefinedType}s. */ - private class LogicalTypeResolver extends LogicalTypeDuplicator { - - @Override - protected LogicalType defaultMethod(LogicalType logicalType) { - if (logicalType.is(LogicalTypeRoot.UNRESOLVED)) { - final UnresolvedUserDefinedType unresolvedType = - (UnresolvedUserDefinedType) logicalType; - return resolveType(unresolvedType.getUnresolvedIdentifier()) - .copy(unresolvedType.isNullable()); - } - return logicalType; - } - } - - private LogicalType resolveType(UnresolvedIdentifier identifier) { - assert identifier != null; - // TODO validate implementation class of structured types when converting from LogicalType - // to DataType - throw new TableException("User-defined types are not supported yet."); + private TableException unsupportedUserDefinedTypes() { + return new TableException("User-defined types are not supported yet."); } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeDuplicator.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeDuplicator.java index 055f7a401eb..e57f67779df 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeDuplicator.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeDuplicator.java @@ -32,7 +32,6 @@ import org.apache.flink.table.types.logical.StructuredType; import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute; import java.util.List; -import java.util.Optional; import java.util.stream.Collectors; /** @@ -94,7 +93,7 @@ public class LogicalTypeDuplicator extends LogicalTypeDefaultVisitor<LogicalType @Override public LogicalType visit(StructuredType structuredType) { - final StructuredType.Builder builder = instantiateStructuredBuilder(structuredType); + final StructuredType.Builder builder = createStructuredBuilder(structuredType); builder.attributes(duplicateStructuredAttributes(structuredType)); builder.setNullable(structuredType.isNullable()); builder.setFinal(structuredType.isFinal()); @@ -123,17 +122,19 @@ public class LogicalTypeDuplicator extends LogicalTypeDefaultVisitor<LogicalType // -------------------------------------------------------------------------------------------- - private StructuredType.Builder instantiateStructuredBuilder(StructuredType structuredType) { - final Optional<ObjectIdentifier> identifier = structuredType.getObjectIdentifier(); - final Optional<Class<?>> implementationClass = structuredType.getImplementationClass(); - if (identifier.isPresent() && implementationClass.isPresent()) { - return StructuredType.newBuilder(identifier.get(), implementationClass.get()); - } else if (identifier.isPresent()) { - return StructuredType.newBuilder(identifier.get()); - } else if (implementationClass.isPresent()) { - return StructuredType.newBuilder(implementationClass.get()); + private StructuredType.Builder createStructuredBuilder(StructuredType structuredType) { + final ObjectIdentifier identifier = structuredType.getObjectIdentifier().orElse(null); + final String className = structuredType.getClassName().orElse(null); + final Class<?> implementationClass = structuredType.getImplementationClass().orElse(null); + + if (identifier != null && implementationClass != null) { + return StructuredType.newBuilder(identifier, implementationClass); + } else if (identifier != null) { + return StructuredType.newBuilder(identifier); + } else if (implementationClass != null) { + return StructuredType.newBuilder(implementationClass); } else { - throw new TableException("Invalid structured type."); + return StructuredType.newBuilder(className); } } diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeDuplicatorTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeDuplicatorTest.java index 7862817d8b9..b68c9a1bdbe 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeDuplicatorTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeDuplicatorTest.java @@ -29,8 +29,10 @@ import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.MapType; import org.apache.flink.table.types.logical.MultisetType; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.RowType.RowField; import org.apache.flink.table.types.logical.SmallIntType; import org.apache.flink.table.types.logical.StructuredType; +import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute; import org.apache.flink.table.types.logical.VarCharType; import org.apache.flink.table.types.logical.utils.LogicalTypeDuplicator; @@ -39,7 +41,7 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import java.util.Arrays; -import java.util.Collections; +import java.util.List; import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; @@ -62,7 +64,8 @@ class LogicalTypeDuplicatorTest { Arguments.of( createDistinctType(new IntType()), createDistinctType(new BigIntType())), Arguments.of(createUserType(new IntType()), createUserType(new BigIntType())), - Arguments.of(createHumanType(), createHumanType())); + Arguments.of(createHumanType(), createHumanType()), + Arguments.of(createNonClassType(), createNonClassType())); } @ParameterizedTest(name = "{index}: {0}") @@ -107,17 +110,15 @@ class LogicalTypeDuplicatorTest { private static RowType createRowType(LogicalType replacedType) { return new RowType( Arrays.asList( - new RowType.RowField("field1", new CharType(2)), - new RowType.RowField("field2", new BooleanType()), - new RowType.RowField("field3", replacedType))); + new RowField("field1", new CharType(2)), + new RowField("field2", new BooleanType()), + new RowField("field3", replacedType))); } private static StructuredType createHumanType() { return StructuredType.newBuilder(ObjectIdentifier.of("cat", "db", "Human"), Human.class) .attributes( - Collections.singletonList( - new StructuredType.StructuredAttribute( - "name", new VarCharType(), "Description."))) + List.of(new StructuredAttribute("name", new VarCharType(), "Description."))) .description("Human type desc.") .setFinal(false) .setInstantiable(false) @@ -126,9 +127,7 @@ class LogicalTypeDuplicatorTest { private static StructuredType createUserType(LogicalType replacedType) { return StructuredType.newBuilder(ObjectIdentifier.of("cat", "db", "User"), User.class) - .attributes( - Collections.singletonList( - new StructuredType.StructuredAttribute("setting", replacedType))) + .attributes(List.of(new StructuredAttribute("setting", replacedType))) .description("User type desc.") .setFinal(false) .setInstantiable(true) @@ -136,10 +135,18 @@ class LogicalTypeDuplicatorTest { .build(); } + private static StructuredType createNonClassType() { + return StructuredType.newBuilder("NotInClassPathType") + .attributes(List.of(new StructuredAttribute("setting", new BooleanType()))) + .build(); + } + + @SuppressWarnings("unused") private abstract static class Human { public String name; } + @SuppressWarnings("unused") private static final class User extends Human { public int setting; } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java index e2ce6e19958..674dd940dcc 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java @@ -294,7 +294,7 @@ public class LogicalTypeJsonSerdeTest { "f2", new VarCharType(200), "desc"))) .build(), // inline structured type with class name only - StructuredType.newBuilder(PojoClass.class.getName()) + StructuredType.newBuilder("NotInClassPathPojo") .attributes( Arrays.asList( new StructuredAttribute("f0", new IntType(true)),