twalthr commented on code in PR #26704: URL: https://github.com/apache/flink/pull/26704#discussion_r2197164667
########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ObjectOfInputTypeStrategy.java: ########## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference.strategies; + +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.ArgumentCount; +import org.apache.flink.table.types.inference.CallContext; +import org.apache.flink.table.types.inference.InputTypeStrategy; +import org.apache.flink.table.types.inference.Signature; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +/** + * Input type strategy for the {@code OBJECT_OF} function that validates argument types and counts. + * + * <p>This strategy validates the input arguments for the {@code OBJECT_OF} function, ensuring: + * + * <ul> + * <li>The argument count is odd (className + pairs of key-value arguments) + * <li>The first argument is a STRING/VARCHAR representing the class name + * <li>All key arguments (odd positions after the first) are STRING/VARCHAR types + * <li>Field names are unique across all key-value pairs + * <li>Value arguments (even positions after the first) can be any type + * </ul> + * + * <p>The expected function signature is: {@code OBJECT_OF(className, key1, value1, key2, value2, + * ...)} + * + * <p>Validation rules: + * + * <ul> + * <li>Minimum 1 argument (just the class name) + * <li>Odd total number of arguments (className + key-value pairs) + * <li>Keys must be string literals for field name extraction + * <li>No duplicate field names allowed + * </ul> + * + * <p><b>Note: Users are responsible for providing a valid fully qualified class name that exists in + * the classpath. The class name should follow Java naming conventions. While this strategy + * validates the format and type of the class name argument, it does not verify the class existence + * in the classpath. If an invalid or non-existent class name is provided, the function will fall + * back to using Row.class as the type representation.</b> + * Review Comment: Remove ########## docs/data/sql_functions_zh.yml: ########## @@ -1259,6 +1259,24 @@ valueconstruction: ```sql f(columns => DESCRIPTOR(`col1`, `col2`), on_time => DESCRIPTOR(`ts`)) ``` + - sql: OBJECT_OF(className, [key, value [, key, value , ...]]) + table: objectOf(STRING, Object...) + description: | + 根据键值对列表创建结构化对象。 + + 该函数通过给定的类名创建结构化类型的实例。 + 结构化类型通过提供交替的键值对来创建,其中键必须是 + 字符串字面量,值可以是任意表达式。 + + 用户有责任提供在类路径中存在的有效完全限定类名。 + 类名应遵循Java命名约定(例如,'com.example.User')。 + 如果提供的类名无效或不存在,该函数将回退到使用 + Row.class作为类型表示。 + + ```sql + -- 创建一个具有name="Bob"和age=42的User对象 + OBJECT_OF('com.example.User', 'name', 'Bob', 'age', 42) + ``` Review Comment: Not sure how good your Chinese is, but this is not your responsibility. You can simply copy the english docs over. ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ObjectOfInputTypeStrategy.java: ########## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference.strategies; + +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.ArgumentCount; +import org.apache.flink.table.types.inference.CallContext; +import org.apache.flink.table.types.inference.InputTypeStrategy; +import org.apache.flink.table.types.inference.Signature; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +/** + * Input type strategy for the {@code OBJECT_OF} function that validates argument types and counts. + * + * <p>This strategy validates the input arguments for the {@code OBJECT_OF} function, ensuring: + * + * <ul> + * <li>The argument count is odd (className + pairs of key-value arguments) + * <li>The first argument is a STRING/VARCHAR representing the class name + * <li>All key arguments (odd positions after the first) are STRING/VARCHAR types + * <li>Field names are unique across all key-value pairs + * <li>Value arguments (even positions after the first) can be any type + * </ul> + * + * <p>The expected function signature is: {@code OBJECT_OF(className, key1, value1, key2, value2, + * ...)} + * + * <p>Validation rules: + * + * <ul> + * <li>Minimum 1 argument (just the class name) + * <li>Odd total number of arguments (className + key-value pairs) + * <li>Keys must be string literals for field name extraction + * <li>No duplicate field names allowed + * </ul> + * + * <p><b>Note: Users are responsible for providing a valid fully qualified class name that exists in + * the classpath. The class name should follow Java naming conventions. While this strategy + * validates the format and type of the class name argument, it does not verify the class existence + * in the classpath. If an invalid or non-existent class name is provided, the function will fall + * back to using Row.class as the type representation.</b> + * + * <p>Example valid calls: + * + * <ul> + * <li>{@code OBJECT_OF('com.example.User')} - empty object + * <li>{@code OBJECT_OF('com.example.User', 'name', 'Alice')} - single field + * <li>{@code OBJECT_OF('com.example.User', 'name', 'Alice', 'age', 30)} - multiple fields + * </ul> + * + * @see org.apache.flink.table.functions.BuiltInFunctionDefinitions#OBJECT_OF + * @see ObjectOfTypeStrategy + */ +public class ObjectOfInputTypeStrategy implements InputTypeStrategy { + + private static final ArgumentCount AT_LEAST_ONE_ODD = + new ArgumentCount() { + @Override + public boolean isValidCount(final int count) { + return count % 2 == 1; + } + + @Override + public Optional<Integer> getMinCount() { + return Optional.of(1); + } + + @Override + public Optional<Integer> getMaxCount() { + return Optional.empty(); + } + }; + + private static void validateClassArgument(final DataType firstArgumentDataType) { + final LogicalType classArgumentType = firstArgumentDataType.getLogicalType(); + + final String errorMessage = + "The first argument must be a non-nullable STRING/VARCHAR type representing the class name."; + if (classArgumentType.isNullable() + || !classArgumentType.is(LogicalTypeFamily.CHARACTER_STRING)) { + throw new ValidationException(errorMessage); + } + } + + private static void validateKeyArguments( + final CallContext callContext, final List<DataType> argumentDataTypes) { + final Set<String> fieldNames = new HashSet<>(); + for (int i = 1; i < argumentDataTypes.size(); i += 2) { + final LogicalType fieldNameLogicalType = argumentDataTypes.get(i).getLogicalType(); + validateFieldNameArgument(callContext, i, fieldNameLogicalType, fieldNames); + } + } + + private static void validateFieldNameArgument( + final CallContext callContext, + final int idx, + final LogicalType logicalType, + final Set<String> fieldNames) { + final int keyIndex = idx + 1; + if (!logicalType.is(LogicalTypeFamily.CHARACTER_STRING)) { + throw new ValidationException( + "The field key at position " + + keyIndex + + " must be a non-nullable STRING/VARCHAR type, but was " + + logicalType.asSummaryString() + + "."); + } + + if (logicalType.isNullable()) { + throw new ValidationException( + "The field key at position " + + keyIndex + + " must be a non-nullable STRING/VARCHAR type."); + } + + final String fieldName = + callContext + .getArgumentValue(idx, String.class) + .orElseThrow(IllegalStateException::new); + if (!fieldNames.add(fieldName)) { + throw new ValidationException( + "The field name '" + fieldName + "' at position " + keyIndex + " is repeated."); + } + } + + @Override + public ArgumentCount getArgumentCount() { + return AT_LEAST_ONE_ODD; + } + + @Override + public Optional<List<DataType>> inferInputTypes( + final CallContext callContext, final boolean throwOnFailure) { + final List<DataType> argumentDataTypes = callContext.getArgumentDataTypes(); + + validateClassArgument(argumentDataTypes.get(0)); + validateKeyArguments(callContext, argumentDataTypes); + + return Optional.of(argumentDataTypes); + } + + @Override + public List<Signature> getExpectedSignatures(final FunctionDefinition definition) { + // OBJECT_OF expects: name, key1, value1, key2, value2, ... + // OBJECT_OF(<name>, [<key>, <value> [, <key>, <value> , ...]] ) + final List<Signature.Argument> arguments = new ArrayList<>(); + + // Class name (required) + arguments.add(Signature.Argument.of("STRING")); Review Comment: nit: `Argument.of("class name", "STRING")` ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/StructuredFunctionsITCase.java: ########## @@ -88,14 +96,72 @@ Stream<TestSetSpec> getTestSetSpecs() { DataTypes.BOOLEAN())); } + private static Stream<TestSetSpec> objectOfTestCases() { + return Stream.of( + TestSetSpec.forFunction(BuiltInFunctionDefinitions.OBJECT_OF) + .onFieldsWithData(42, "Bob") + .andDataTypes(DataTypes.INT(), DataTypes.STRING()) + .withFunction(Type1.Type1Constructor.class) + .withFunction(Type2.Type2Constructor.class) + .withFunction(NestedType.NestedConstructor.class) + // Test with OBJECT_OF + .testResult( + objectOf(Type1.class, "a", 42, "b", "Bob"), + "OBJECT_OF('" + Type1.class.getName() + "', 'a', 42, 'b', 'Bob')", + Row.of(42, "Bob"), + Type1.STRUCTURED_TYPE) + // Test with same value from function + .testSqlResult( + "Type1Constructor(f0, f1) = OBJECT_OF('" + + Type1.class.getName() + + "', 'a', 42, 'b', 'Bob')", + true, + DataTypes.BOOLEAN()) + // Test with nested structured types + .testSqlResult( + "NestedConstructor(Type1Constructor(f0, f1), Type2Constructor(15, 'Alice')) = " + + "OBJECT_OF('" + + NestedType.class.getName() + + "', 'n1', OBJECT_OF('" + + Type1.class.getName() + + "', 'a', 42, 'b', 'Bob'), " + + "'n2', OBJECT_OF('" + + Type2.class.getName() + + "', 'a', 15, 'b', 'Alice'))", + true, + DataTypes.BOOLEAN()) + // Test with TYPEOF + .testSqlResult( + "TYPEOF(OBJECT_OF('" + + Type1.class.getName() + + "', 'a', 42, 'b', 'Bob'))", + Type1.TYPE, + DataTypes.STRING()) + // Invalid Test - field name is not a string literal + .testSqlValidationError( + "OBJECT_OF('" + + Type1.class.getName() + + "', CAST(NULL AS STRING), 42, 'b', 'Bob')", + "The field key at position 2 must be a non-nullable STRING/VARCHAR type.") + // Invalid Test - first argument is type string but null + .testSqlValidationError( + "OBJECT_OF(CAST(NULL AS STRING), 'a', '12', 'b', 'Alice')", + "The first argument must be a non-nullable STRING/VARCHAR type representing the class name.")); + } + // -------------------------------------------------------------------------------------------- // Helpers // -------------------------------------------------------------------------------------------- /** Structured type Type1. */ public static class Type1 { private static final String TYPE = Review Comment: ```suggestion private static final String TYPE_STRING = ``` ########## flink-python/pyflink/table/expressions.py: ########## @@ -560,6 +562,45 @@ def map_from_arrays(key, value) -> Expression: return _binary_op("mapFromArrays", key, value) +def object_of(class_name: Union[str, 'JavaClass'], *args) -> Expression: Review Comment: For Python we only support `str` for now. ########## docs/data/sql_functions.yml: ########## @@ -1178,6 +1178,24 @@ valueconstruction: ```sql f(columns => DESCRIPTOR(`col1`, `col2`), on_time => DESCRIPTOR(`ts`)) ``` + - sql: OBJECT_OF(className, [key, value [, key, value , ...]]) + table: objectOf(STRING, Object...) + description: | + Creates a structured object from a list of key-value pairs. + + The function creates an instance of a structured type identified by the given class name. + The structured type is created by providing alternating key-value pairs where keys must be + string literals and values can be arbitrary expressions. + + Users are responsible for providing a valid fully qualified class name that exists + in the classpath. The class name should follow Java naming conventions (e.g., 'com.example.User'). Review Comment: The class name does not need to exist in the class path. It is only used for distinguishing two objects with identical fields. Only when leaving the SQL world to Table API (e.g. for collecting results or UDFs) it should match a class name. But also this is optional if you don't want to use your desired class. ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ObjectOfTypeStrategy.java: ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference.strategies; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.DataTypes.Field; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.CallContext; +import org.apache.flink.table.types.inference.TypeStrategy; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.table.types.utils.TypeConversions; + +import java.util.List; +import java.util.Optional; +import java.util.stream.IntStream; + +/** + * Type strategy for the {@code OBJECT_OF} function that infers the output type as a structured + * type. + * + * <p>This strategy creates a {@link DataTypes#STRUCTURED} type based on the provided class name and + * key-value pairs. The function signature is: {@code OBJECT_OF(className, key1, value1, key2, + * value2, ...)} + * + * <p>The strategy performs the following operations: + * + * <ul> + * <li>Extracts the class name from the first argument (must be a string literal) + * <li>Processes key-value pairs starting from the second argument + * <li>Extracts field names from odd-positioned arguments (indices 1, 3, 5, ...) + * <li>Normalizes field types from even-positioned arguments (indices 2, 4, 6, ...) + * <li>Creates a structured type with the given class name and normalized fields + * </ul> + * + * <p>Field type normalization includes: + * + * <ul> + * <li>Converting CHARACTER_STRING types (CHAR, VARCHAR) to nullable VARCHAR (STRING) + * <li>Making all field types nullable for flexibility in structured types + * </ul> + * + * <p>The strategy returns {@code Optional.empty()} if the class name argument is not available as a + * literal value (e.g., during type inference testing scenarios). + * + * <p><b>Examples:</b> + * + * <ul> + * <li>{@code OBJECT_OF('com.example.User', 'name', 'Alice', 'age', 30)} → {@code + * STRUCTURED<com.example.User>(name STRING, age INT)} + * <li>{@code OBJECT_OF('com.example.Point', 'x', 1.5, 'y', 2.0)} → {@code + * STRUCTURED<com.example.Point>(x DOUBLE, y DOUBLE)} + * </ul> + * + * <p><b>Implementation Notes:</b> + * + * <ul> + * <li>Field names must be available as string literals during type inference + * <li>The class name is used for type identification but the runtime representation is RowData + * <li>Uses {@link IntStream} for efficient processing of key-value pairs + * </ul> + * + * @see org.apache.flink.table.functions.BuiltInFunctionDefinitions#OBJECT_OF + * @see ObjectOfInputTypeStrategy + */ +public class ObjectOfTypeStrategy implements TypeStrategy { + + private static DataType toStructuredType( + final String className, final CallContext callContext) { + final List<DataType> argumentDataTypes1 = callContext.getArgumentDataTypes(); + final DataTypes.Field[] fields = + IntStream.range(1, argumentDataTypes1.size()) + .filter(i -> i % 2 == 1) + .mapToObj(keyIdx -> toFieldDataType(callContext, keyIdx)) + .toArray(DataTypes.Field[]::new); + + return DataTypes.STRUCTURED(className, fields); + } + + private static Field toFieldDataType(final CallContext callContext, final int keyIdx) { + final List<DataType> argumentDataTypes = callContext.getArgumentDataTypes(); + + final String fieldName = + callContext + .getArgumentValue(keyIdx, String.class) + .orElseThrow(IllegalStateException::new); + + final DataType fieldValueType = argumentDataTypes.get(keyIdx + 1); + final DataType normalizedFieldValueType = normalizeFieldType(fieldValueType); + return DataTypes.FIELD(fieldName, normalizedFieldValueType); + } + + /** + * Normalizes field types for structured types: - Converts CHARACTER_STRING types (CHAR, + * VARCHAR) to VARCHAR (STRING) - Makes all types nullable for flexibility in structured types. + */ + private static DataType normalizeFieldType(DataType fieldType) { + final LogicalType logicalType = fieldType.getLogicalType(); + + if (logicalType.is(LogicalTypeFamily.CHARACTER_STRING)) { + return DataTypes.STRING().nullable(); + } Review Comment: Makes sense to me. We have special cases for exactly BINARY/CHAR at other locations like `org.apache.flink.table.planner.calcite.FlinkTypeSystem#shouldConvertRaggedUnionTypesToVarying` ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ObjectOfTypeStrategy.java: ########## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference.strategies; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.DataTypes.Field; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.CallContext; +import org.apache.flink.table.types.inference.TypeStrategy; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.table.types.utils.TypeConversions; + +import java.util.List; +import java.util.Optional; +import java.util.stream.IntStream; + +/** + * Type strategy for the {@code OBJECT_OF} function that infers the output type as a structured + * type. + * + * <p>This strategy creates a {@link DataTypes#STRUCTURED} type based on the provided class name and + * key-value pairs. The function signature is: {@code OBJECT_OF(className, key1, value1, key2, + * value2, ...)} + * + * <p>The strategy performs the following operations: + * + * <ul> + * <li>Extracts the class name from the first argument (must be a string literal) + * <li>Processes key-value pairs starting from the second argument + * <li>Extracts field names (keys) from odd-positioned arguments (indices 1, 3, 5, ...) + * <li>Normalizes field types (values) from even-positioned arguments (indices 2, 4, 6, ...) + * <li>Creates a structured type with the given class name and normalized fields + * </ul> + * + * <p>Field type normalization includes: + * + * <ul> + * <li>Converting CHARACTER_STRING types (CHAR, VARCHAR) to nullable VARCHAR (STRING) + * <li>Making all field types nullable for flexibility in structured types + * </ul> + * + * <p>The strategy returns {@code Optional.empty()} if the class name argument is not available as a + * literal value. Review Comment: Input type strategy should have checked that already? ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ObjectOfTypeStrategy.java: ########## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference.strategies; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.DataTypes.Field; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.CallContext; +import org.apache.flink.table.types.inference.TypeStrategy; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.table.types.utils.TypeConversions; + +import java.util.List; +import java.util.Optional; +import java.util.stream.IntStream; + +/** + * Type strategy for the {@code OBJECT_OF} function that infers the output type as a structured + * type. + * + * <p>This strategy creates a {@link DataTypes#STRUCTURED} type based on the provided class name and + * key-value pairs. The function signature is: {@code OBJECT_OF(className, key1, value1, key2, + * value2, ...)} + * + * <p>The strategy performs the following operations: + * + * <ul> + * <li>Extracts the class name from the first argument (must be a string literal) + * <li>Processes key-value pairs starting from the second argument + * <li>Extracts field names (keys) from odd-positioned arguments (indices 1, 3, 5, ...) + * <li>Normalizes field types (values) from even-positioned arguments (indices 2, 4, 6, ...) + * <li>Creates a structured type with the given class name and normalized fields + * </ul> + * + * <p>Field type normalization includes: + * + * <ul> + * <li>Converting CHARACTER_STRING types (CHAR, VARCHAR) to nullable VARCHAR (STRING) + * <li>Making all field types nullable for flexibility in structured types + * </ul> + * + * <p>The strategy returns {@code Optional.empty()} if the class name argument is not available as a + * literal value. + * + * <p><b>Note: Users are responsible for providing a valid fully qualified class name that exists in + * the classpath. The class name should follow Java naming conventions. If an invalid or + * non-existent class name is provided, the function will fall back to using Row.class as the type + * representation.</b> Review Comment: ```suggestion * <p><b>Note: Users are responsible for providing a valid fully qualified class name that exists in * the classpath. The class name should follow Java naming conventions. If an invalid or * non-existent class name is provided, the function will fall back to using Row.class as the type * representation.</b> ``` remove ########## flink-python/pyflink/table/expressions.py: ########## @@ -560,6 +562,45 @@ def map_from_arrays(key, value) -> Expression: return _binary_op("mapFromArrays", key, value) +def object_of(class_name: Union[str, 'JavaClass'], *args) -> Expression: + """ + Creates a structured object from a list of key-value pairs. + + This function creates an instance of a structured type identified by the given class name. + The structured type is created by providing alternating key-value pairs where keys must be + string literals and values can be arbitrary expressions. + + This function corresponds to the SQL `OBJECT_OF` function. + + Note: Users are responsible for providing a valid fully qualified class name that exists + in the classpath. The class name should follow Java naming conventions (e.g., 'com.example.User'). + If an invalid or non-existent class name is provided, the function will fall back to using + Row.class as the type representation. + + :param class_name: The fully qualified class name or the JavaClass object. + :param args: Alternating key-value pairs: key1, value1, key2, value2, ... + :return: A structured object expression + + Examples: + :: + + >>> # Creates a User object with name="Alice" and age=30 + >>> object_of("com.example.User", "name", "Alice", "age", 30) + + >>> # Using JavaClass (loaded via java gateway) + >>> from pyflink.util.java_utils import load_java_class + >>> user_class = load_java_class("com.example.User") + >>> object_of(user_class, "name", "Bob", "age", 25) + + .. seealso:: SQL function: OBJECT_OF('com.example.User', 'name', 'Bob', 'age', 25) Review Comment: ```suggestion """ Creates a structured object from a list of key-value pairs. This function creates an instance of a structured type identified by the given class name. The structured type is created by providing alternating key-value pairs where keys must be string literals and values can be arbitrary expressions. Note: The class name is only used for distinguishing two structured types with identical fields. Structured types are internally handled with suitable data structures. Thus, serialization and equality checks (e.g. {@code hashCode/equals}) are managed by the system. In Table API and UDF calls, the system will attempt to resolve the class name to an actual implementation class. In this case the class name needs to be present in the user classpath. If resolution fails, {@link Row} is used as a fallback. :param class_name: The fully qualified class name :param args: Alternating key-value pairs: key1, value1, key2, value2, ... :return: A structured object expression Examples: :: >>> # Creates a User object with name="Alice" and age=30 >>> object_of("com.example.User", "name", "Alice", "age", 30) .. seealso:: SQL function: OBJECT_OF('com.example.User', 'name', 'Bob', 'age', 25) ``` ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java: ########## @@ -1006,7 +1006,7 @@ public static <T> DataType STRUCTURED(Class<T> implementationClass, Field... fie * @see DataTypes#of(Class) * @see StructuredType */ - public static <T> DataType STRUCTURED(String className, Field... fields) { + public static DataType STRUCTURED(String className, Field... fields) { Review Comment: Good catch! ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/StructuredFunctionsITCase.java: ########## @@ -88,14 +96,72 @@ Stream<TestSetSpec> getTestSetSpecs() { DataTypes.BOOLEAN())); } + private static Stream<TestSetSpec> objectOfTestCases() { + return Stream.of( + TestSetSpec.forFunction(BuiltInFunctionDefinitions.OBJECT_OF) + .onFieldsWithData(42, "Bob") + .andDataTypes(DataTypes.INT(), DataTypes.STRING()) + .withFunction(Type1.Type1Constructor.class) + .withFunction(Type2.Type2Constructor.class) + .withFunction(NestedType.NestedConstructor.class) + // Test with OBJECT_OF + .testResult( + objectOf(Type1.class, "a", 42, "b", "Bob"), + "OBJECT_OF('" + Type1.class.getName() + "', 'a', 42, 'b', 'Bob')", + Row.of(42, "Bob"), + Type1.STRUCTURED_TYPE) + // Test with same value from function + .testSqlResult( + "Type1Constructor(f0, f1) = OBJECT_OF('" + + Type1.class.getName() + + "', 'a', 42, 'b', 'Bob')", + true, + DataTypes.BOOLEAN()) + // Test with nested structured types + .testSqlResult( + "NestedConstructor(Type1Constructor(f0, f1), Type2Constructor(15, 'Alice')) = " + + "OBJECT_OF('" + + NestedType.class.getName() + + "', 'n1', OBJECT_OF('" + + Type1.class.getName() + + "', 'a', 42, 'b', 'Bob'), " + + "'n2', OBJECT_OF('" + + Type2.class.getName() + + "', 'a', 15, 'b', 'Alice'))", + true, + DataTypes.BOOLEAN()) + // Test with TYPEOF + .testSqlResult( + "TYPEOF(OBJECT_OF('" + + Type1.class.getName() + + "', 'a', 42, 'b', 'Bob'))", + Type1.TYPE, + DataTypes.STRING()) + // Invalid Test - field name is not a string literal + .testSqlValidationError( + "OBJECT_OF('" + + Type1.class.getName() + + "', CAST(NULL AS STRING), 42, 'b', 'Bob')", + "The field key at position 2 must be a non-nullable STRING/VARCHAR type.") + // Invalid Test - first argument is type string but null + .testSqlValidationError( + "OBJECT_OF(CAST(NULL AS STRING), 'a', '12', 'b', 'Alice')", + "The first argument must be a non-nullable STRING/VARCHAR type representing the class name.")); + } + // -------------------------------------------------------------------------------------------- // Helpers // -------------------------------------------------------------------------------------------- /** Structured type Type1. */ public static class Type1 { private static final String TYPE = - "STRUCTURED<'" + Type1.class.getName() + "', a INT, b STRING>"; + "STRUCTURED<'" + Type1.class.getName() + "', `a` INT, `b` STRING>"; + private static final DataType STRUCTURED_TYPE = Review Comment: ```suggestion private static final DataType DATA_TYPE = ``` ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/StructuredFunctionsITCase.java: ########## @@ -88,14 +96,72 @@ Stream<TestSetSpec> getTestSetSpecs() { DataTypes.BOOLEAN())); } + private static Stream<TestSetSpec> objectOfTestCases() { + return Stream.of( + TestSetSpec.forFunction(BuiltInFunctionDefinitions.OBJECT_OF) + .onFieldsWithData(42, "Bob") + .andDataTypes(DataTypes.INT(), DataTypes.STRING()) + .withFunction(Type1.Type1Constructor.class) + .withFunction(Type2.Type2Constructor.class) + .withFunction(NestedType.NestedConstructor.class) + // Test with OBJECT_OF + .testResult( + objectOf(Type1.class, "a", 42, "b", "Bob"), + "OBJECT_OF('" + Type1.class.getName() + "', 'a', 42, 'b', 'Bob')", + Row.of(42, "Bob"), + Type1.STRUCTURED_TYPE) + // Test with same value from function + .testSqlResult( + "Type1Constructor(f0, f1) = OBJECT_OF('" + + Type1.class.getName() + + "', 'a', 42, 'b', 'Bob')", + true, + DataTypes.BOOLEAN()) + // Test with nested structured types + .testSqlResult( + "NestedConstructor(Type1Constructor(f0, f1), Type2Constructor(15, 'Alice')) = " + + "OBJECT_OF('" + + NestedType.class.getName() + + "', 'n1', OBJECT_OF('" + + Type1.class.getName() + + "', 'a', 42, 'b', 'Bob'), " + + "'n2', OBJECT_OF('" + + Type2.class.getName() + + "', 'a', 15, 'b', 'Alice'))", + true, + DataTypes.BOOLEAN()) + // Test with TYPEOF + .testSqlResult( + "TYPEOF(OBJECT_OF('" + + Type1.class.getName() + + "', 'a', 42, 'b', 'Bob'))", + Type1.TYPE, + DataTypes.STRING()) Review Comment: already covered by first test, can be removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org