twalthr commented on code in PR #26806: URL: https://github.com/apache/flink/pull/26806#discussion_r2222743469
########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java: ########## @@ -2488,4 +2489,48 @@ public OutType percentile(InType percentage, InType frequency) { objectToExpression(percentage), objectToExpression(frequency))); } + + /** + * Updates existing fields in a structured object by providing key-value pairs. + * + * <p>This function takes a structured object and updates specified fields with new values. The + * keys must be string literals that correspond to existing fields in the structured type. If a + * key does not exist in the input object, an exception will be thrown. If the value type is not + * compatible with the corresponding structured field type, an exception will also be thrown. + * + * <p>The function expects alternating key-value pairs where keys are field names (non-null + * strings) and values are the new values for those fields. At least one key-value pair must be + * provided. + * + * <p>Example usage: + * + * <pre>{@code + * // Create a structured object representing a user + * User userObject = objectOf("com.example.User", "name", "Bob", "age", 25); Review Comment: These examples are a bit misleading. They read as if `User userObject = objectOf` is Java code and the function returns a class instance `User`. The explanation in docs is better. ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ObjectUpdateInputTypeStrategy.java: ########## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference.strategies; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.ArgumentCount; +import org.apache.flink.table.types.inference.CallContext; +import org.apache.flink.table.types.inference.InputTypeStrategy; +import org.apache.flink.table.types.inference.Signature; +import org.apache.flink.table.types.inference.Signature.Argument; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.StructuredType; +import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Input type strategy for the {@link BuiltInFunctionDefinitions#OBJECT_UPDATE} function. + * + * <p>This strategy validates the input arguments for updating existing fields in a structured type: + * + * <ul> + * <li>Ensures the function has an odd number of arguments (at least 3) + * <li>Validates the first argument is a non-null structured type Review Comment: ```suggestion * <li>Validates the first argument is a non-null structured type ``` doesn't need to be non-null. if null, null is returned. this is common for SQL functions. ########## flink-python/pyflink/table/expression.py: ########## @@ -2193,6 +2193,55 @@ def json_unquote(self) -> 'Expression': """ return _unary_op("jsonUnquote")(self) + # ---------------------------- value modification functions ----------------------------- + + def object_update(self, *kv) -> "Expression": + """ + Updates existing fields in a structured object by providing key-value pairs. + + This function takes a structured object and updates specified fields with new values. + The keys must be string literals that correspond to existing fields in the structured type. + If a key does not exist in the input object, an exception will be thrown. + If the value type is not compatible with the corresponding structured field type, + an exception will also be thrown. + + The function expects alternating key-value pairs where keys are field names + (non-null strings) and values are the new values for those fields. + At least one key-value pair must be provided. + The total number of arguments must be odd (object + pairs of key-value arguments). + + The result type is the same structured type as the input, with the specified fields + updated to their new values. + + Example: + :: + + >>> # Update the 'name' field of a user object + >>> user_obj.object_update("name", "Alice") + >>> # Returns an updated user object with 'name' set to "Alice" + >>> + >>> # Update multiple fields + >>> user_obj.object_update("name", "Alice", "age", 30) + >>> # Returns an updated user object with 'name' set to "Alice" and 'age' set to 30 + + The result type is the same structured type as the input, with the specified + fields updated to their new values. + + :param kv: key-value pairs where even-indexed elements are field names + (strings) and odd-indexed elements are the new values for those + fields + :return: expression representing the updated structured type with modified + field values + """ + gateway = get_gateway() + ApiExpressionUtils = ( + gateway.jvm.org.apache.flink.table.expressions.ApiExpressionUtils + ) + exprs = [ + ApiExpressionUtils.objectToExpression(_get_java_expression(e)) for e in kv + ] Review Comment: Is this pattern used somewhere else? why not `_varargs_op`? ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java: ########## @@ -2488,4 +2489,48 @@ public OutType percentile(InType percentage, InType frequency) { objectToExpression(percentage), objectToExpression(frequency))); } + + /** + * Updates existing fields in a structured object by providing key-value pairs. + * + * <p>This function takes a structured object and updates specified fields with new values. The + * keys must be string literals that correspond to existing fields in the structured type. If a + * key does not exist in the input object, an exception will be thrown. If the value type is not + * compatible with the corresponding structured field type, an exception will also be thrown. + * + * <p>The function expects alternating key-value pairs where keys are field names (non-null + * strings) and values are the new values for those fields. At least one key-value pair must be + * provided. + * + * <p>Example usage: + * + * <pre>{@code + * // Create a structured object representing a user + * User userObject = objectOf("com.example.User", "name", "Bob", "age", 25); + * + * // Update the 'name' field of a user object + * User updatedUser1 = userObject.objectUpdate("name", "Alice") + * + * // Update multiple fields + * User updatedUser2 = userObject.objectUpdate("name", "Alice", "age", 30) + * + * }</pre> + * + * <p>The result type is the same structured type as the input, with the specified fields + * updated to their new values. + * + * @param kv key-value pairs where even-indexed elements are field names (strings) and + * odd-indexed elements are the new values for those fields + * @return expression representing a new structured object with updated field values + * @see org.apache.flink.table.functions.BuiltInFunctionDefinitions#OBJECT_UPDATE Review Comment: remove this reference? rather internal ```suggestion * @see org.apache.flink.table.functions.BuiltInFunctionDefinitions#OBJECT_UPDATE ``` ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ObjectUpdateTypeStrategy.java: ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference.strategies; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.DataTypes.Field; +import org.apache.flink.table.catalog.DataTypeFactory; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.CallContext; +import org.apache.flink.table.types.inference.TypeStrategy; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.StructuredType; +import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; +import org.apache.flink.table.types.utils.TypeConversions; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Type strategy for the {@link BuiltInFunctionDefinitions#OBJECT_UPDATE} function. + * + * <p>This strategy infers the return type for the OBJECT_UPDATE function by: + * + * <ul> + * <li>Extracting the field definitions from the input structured type + * <li>Resolving the class from the structured type + * <li> Review Comment: ```suggestion ``` ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/StructuredFunctionsITCase.java: ########## @@ -197,6 +202,99 @@ private static Stream<TestSetSpec> objectOfTestCases() { "The first argument must be a non-nullable character string literal representing the class name.")); } + private static Stream<TestSetSpec> objectUpdateTestCases() { + return Stream.of( + TestSetSpec.forFunction(BuiltInFunctionDefinitions.OBJECT_UPDATE) + .onFieldsWithData(42, "Bob") + .andDataTypes(DataTypes.INT(), DataTypes.STRING()) + .withFunction(Type1.Type1Constructor.class) + .withFunction(Type2.Type2Constructor.class) + .withFunction(NestedType.NestedConstructor.class) + // Test update all fields equality + .testResult( + call("Type1Constructor", $("f0"), $("f1")) + .objectUpdate("a", 16, "b", "Alice"), + "OBJECT_UPDATE(OBJECT_OF('" + + Type1.class.getName() Review Comment: add a test where type does not exist in classpath ########## docs/data/sql_functions.yml: ########## @@ -1219,6 +1219,35 @@ valueconstruction: - table: NUMERIC.rows description: Creates a NUMERIC interval of rows (commonly used in window creation). +valuemodification: + - sql: OBJECT_UPDATE(object, key, value [, key, value , ...]) + table: OBJECT.objectUpdate(key, value [, key, value , ...]) + description: | + Updates existing fields in a structured object by providing key-value pairs. + + This function takes a structured object and updates specified fields with new values. + The keys must be string literals that correspond to existing fields in the structured type. + If a key does not exist in the input object, an exception will be thrown. + If the value type is not compatible with the corresponding structured field type, + an exception will also be thrown. + + The function expects alternating key-value pairs where keys are field names (non-null strings) + and values are the new values for those fields. At least one key-value pair must be provided. + The total number of arguments must be odd (object + pairs of key-value arguments). + + The result type is the same structured type as the input, with the specified fields + updated to their new values. + + ```sql + -- Update the 'name' field of a user object + OBJECT_UPDATE(OBJECT_OF('com.example.User', 'name', 'Bob', 'age', 14), 'name', 'Alice') + -- Returns: User{name='Alice', age=14}) Review Comment: ```suggestion -- Returns: User{name='Alice', age=14} ``` ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ObjectUpdateTypeStrategy.java: ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference.strategies; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.DataTypes.Field; +import org.apache.flink.table.catalog.DataTypeFactory; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.CallContext; +import org.apache.flink.table.types.inference.TypeStrategy; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.StructuredType; +import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; +import org.apache.flink.table.types.utils.TypeConversions; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Type strategy for the {@link BuiltInFunctionDefinitions#OBJECT_UPDATE} function. + * + * <p>This strategy infers the return type for the OBJECT_UPDATE function by: + * + * <ul> + * <li>Extracting the field definitions from the input structured type + * <li>Resolving the class from the structured type + * <li> + * <li>Creating a new structured type with the same class but updating field types to match the + * types of the new values being assigned + * </ul> + * + * <p>The return type preserves the original class and field names but may have different field + * types than the input structured type, as field types are updated to match the types of the values + * being assigned during the update operation. + */ +@Internal +public class ObjectUpdateTypeStrategy implements TypeStrategy { + + private static Class<?> extractClass( + final CallContext callContext, final StructuredType structuredType) { + final DataTypeFactory dataTypeFactory = callContext.getDataTypeFactory(); + final ClassLoader classLoader = dataTypeFactory.getClassLoader(); + String className = structuredType.getClassName().orElseThrow(IllegalStateException::new); + + return StructuredType.resolveClass(classLoader, className) + .orElseThrow(IllegalStateException::new); Review Comment: ```suggestion .orElseThrow(IllegalStateException::new); ``` class is optional ########## docs/data/sql_functions.yml: ########## @@ -1219,6 +1219,35 @@ valueconstruction: - table: NUMERIC.rows description: Creates a NUMERIC interval of rows (commonly used in window creation). +valuemodification: + - sql: OBJECT_UPDATE(object, key, value [, key, value , ...]) + table: OBJECT.objectUpdate(key, value [, key, value , ...]) + description: | + Updates existing fields in a structured object by providing key-value pairs. + + This function takes a structured object and updates specified fields with new values. + The keys must be string literals that correspond to existing fields in the structured type. + If a key does not exist in the input object, an exception will be thrown. + If the value type is not compatible with the corresponding structured field type, + an exception will also be thrown. + + The function expects alternating key-value pairs where keys are field names (non-null strings) + and values are the new values for those fields. At least one key-value pair must be provided. + The total number of arguments must be odd (object + pairs of key-value arguments). + + The result type is the same structured type as the input, with the specified fields + updated to their new values. + + ```sql + -- Update the 'name' field of a user object + OBJECT_UPDATE(OBJECT_OF('com.example.User', 'name', 'Bob', 'age', 14), 'name', 'Alice') + -- Returns: User{name='Alice', age=14}) + + -- Update multiple fields + OBJECT_UPDATE(OBJECT_OF('com.example.User', 'name', 'Bob', 'age', 14), 'name', 'Alice', 'age', 30) + -- Returns: User{name='Alice', age=30}) Review Comment: ```suggestion -- Returns: User{name='Alice', age=30} ``` ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ObjectUpdateInputTypeStrategy.java: ########## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference.strategies; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.ArgumentCount; +import org.apache.flink.table.types.inference.CallContext; +import org.apache.flink.table.types.inference.InputTypeStrategy; +import org.apache.flink.table.types.inference.Signature; +import org.apache.flink.table.types.inference.Signature.Argument; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.StructuredType; +import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Input type strategy for the {@link BuiltInFunctionDefinitions#OBJECT_UPDATE} function. + * + * <p>This strategy validates the input arguments for updating existing fields in a structured type: + * + * <ul> + * <li>Ensures the function has an odd number of arguments (at least 3) + * <li>Validates the first argument is a non-null structured type + * <li>Validates that key arguments are non-null string literals + * <li>Ensures field names are not repeated in the key-value pairs + * <li>Ensures field names are part of the structured type's attributes + * <li>Ensures field values match the expected types defined in the structured type + * </ul> + * + * <p>The expected signature is: {@code OBJECT_UPDATE(object, key1, value1, key2, value2, ...)} + */ +@Internal +public class ObjectUpdateInputTypeStrategy implements InputTypeStrategy { + + private static final ArgumentCount AT_LEAST_THREE_AND_ODD = + new ArgumentCount() { + @Override + public boolean isValidCount(final int count) { + return count % 2 == 1; + } + + @Override + public Optional<Integer> getMinCount() { + return Optional.of(3); + } + + @Override + public Optional<Integer> getMaxCount() { + return Optional.empty(); + } + }; + + private static StructuredType validateObjectArgument(final DataType firstArgumentType) { + final LogicalType firstArgumentLogicalType = firstArgumentType.getLogicalType(); + if (!firstArgumentLogicalType.is(LogicalTypeRoot.STRUCTURED_TYPE)) { + throw new ValidationException( + String.format( + "The first argument must be a structured type, but was %s.", + firstArgumentLogicalType)); + } + return (StructuredType) firstArgumentLogicalType; + } + + private static void validateKeyValueArguments( + final CallContext callContext, + final List<DataType> argumentDataTypes, + final StructuredType structuredType) { + final Set<String> fieldNames = new HashSet<>(); + final Map<String, LogicalType> structuredTypeAttributeNameToLogicalType = + structuredType.getAttributes().stream() + .collect( + Collectors.toMap( + StructuredAttribute::getName, + StructuredAttribute::getType)); + + for (int i = 1; i < argumentDataTypes.size(); i += 2) { + final String keyName = + validateFieldNameArgument( + callContext, + argumentDataTypes, + i, + structuredTypeAttributeNameToLogicalType, + fieldNames); + + validateValueArgument( + argumentDataTypes, i + 1, structuredTypeAttributeNameToLogicalType, keyName); + } + } + + private static String validateFieldNameArgument( + final CallContext callContext, + final List<DataType> argumentDataTypes, + final int pos, + final Map<String, LogicalType> attributes, + final Set<String> fieldNames) { + final LogicalType fieldNameLogicalType = argumentDataTypes.get(pos).getLogicalType(); + if (!fieldNameLogicalType.is(LogicalTypeFamily.CHARACTER_STRING)) { + final String message = + String.format( + "The field key at position %d must be a non-null character string, but was %s.", + pos + 1, fieldNameLogicalType.asSummaryString()); + throw new ValidationException(message); + } + + final String fieldName = + callContext + .getArgumentValue(pos, String.class) + .orElseThrow( + () -> { + final String message = + String.format( + "The field key at position %d must be a non-null character string literal.", + pos + 1); + return new ValidationException(message); + }); + + // validate that the field name is not repeated + if (!fieldNames.add(fieldName)) { + final String message = + String.format( + "The field name '%s' at position %d is repeated.", fieldName, pos + 1); + throw new ValidationException(message); + } + + // validate that the field name is part of the structured type attributes + if (!attributes.containsKey(fieldName)) { + final String message = + String.format( + "The field name '%s' at position %d is not part of the structured type attributes. Available attributes: %s.", + fieldName, pos + 1, attributes.keySet()); + throw new ValidationException(message); + } + + return fieldName; + } + + private static void validateValueArgument( + final List<DataType> argumentDataTypes, + final int pos, + final Map<String, LogicalType> structuredTypeAttributes, + final String keyNameToBeUpdated) { + final DataType argumentValueDataType = argumentDataTypes.get(pos); + final LogicalType argumentValueLogicalType = argumentValueDataType.getLogicalType(); + + final LogicalType expectedType = structuredTypeAttributes.get(keyNameToBeUpdated); + final DataType expectedDataType = DataTypes.of(expectedType); + + // Validate that the updated value type matches the expected type + if (!argumentValueLogicalType Review Comment: This check can be purely logical. So `LogicalTypeCasts#supportsImplicitCast` could be an option. But I'm wondering if we should remove this validation entirely. Otherwise a CHAR(3) could not be stored in a CHAR(2). Any mutation should be allowed and needs to be checked by a different entity (e.g. during equals). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org