twalthr commented on code in PR #26806:
URL: https://github.com/apache/flink/pull/26806#discussion_r2222743469


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java:
##########
@@ -2488,4 +2489,48 @@ public OutType percentile(InType percentage, InType 
frequency) {
                         objectToExpression(percentage),
                         objectToExpression(frequency)));
     }
+
+    /**
+     * Updates existing fields in a structured object by providing key-value 
pairs.
+     *
+     * <p>This function takes a structured object and updates specified fields 
with new values. The
+     * keys must be string literals that correspond to existing fields in the 
structured type. If a
+     * key does not exist in the input object, an exception will be thrown. If 
the value type is not
+     * compatible with the corresponding structured field type, an exception 
will also be thrown.
+     *
+     * <p>The function expects alternating key-value pairs where keys are 
field names (non-null
+     * strings) and values are the new values for those fields. At least one 
key-value pair must be
+     * provided.
+     *
+     * <p>Example usage:
+     *
+     * <pre>{@code
+     * // Create a structured object representing a user
+     * User userObject = objectOf("com.example.User", "name", "Bob", "age", 
25);

Review Comment:
   These examples are a bit misleading. They read as if `User userObject = 
objectOf` is Java code and the function returns a class instance `User`. The 
explanation in docs is better.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ObjectUpdateInputTypeStrategy.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.inference.Signature.Argument;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Input type strategy for the {@link 
BuiltInFunctionDefinitions#OBJECT_UPDATE} function.
+ *
+ * <p>This strategy validates the input arguments for updating existing fields 
in a structured type:
+ *
+ * <ul>
+ *   <li>Ensures the function has an odd number of arguments (at least 3)
+ *   <li>Validates the first argument is a non-null structured type

Review Comment:
   ```suggestion
    *   <li>Validates the first argument is a non-null structured type
   ```
   doesn't need to be non-null. if null, null is returned. this is common for 
SQL functions.



##########
flink-python/pyflink/table/expression.py:
##########
@@ -2193,6 +2193,55 @@ def json_unquote(self) -> 'Expression':
         """
         return _unary_op("jsonUnquote")(self)
 
+    # ---------------------------- value modification functions 
-----------------------------
+
+    def object_update(self, *kv) -> "Expression":
+        """
+        Updates existing fields in a structured object by providing key-value 
pairs.
+
+        This function takes a structured object and updates specified fields 
with new values.
+        The keys must be string literals that correspond to existing fields in 
the structured type.
+        If a key does not exist in the input object, an exception will be 
thrown.
+        If the value type is not compatible with the corresponding structured 
field type,
+        an exception will also be thrown.
+
+        The function expects alternating key-value pairs where keys are field 
names
+        (non-null strings) and values are the new values for those fields.
+        At least one key-value pair must be provided.
+        The total number of arguments must be odd (object + pairs of key-value 
arguments).
+
+        The result type is the same structured type as the input, with the 
specified fields
+        updated to their new values.
+
+        Example:
+        ::
+
+            >>> # Update the 'name' field of a user object
+            >>> user_obj.object_update("name", "Alice")
+            >>> # Returns an updated user object with 'name' set to "Alice"
+            >>>
+            >>> # Update multiple fields
+            >>> user_obj.object_update("name", "Alice", "age", 30)
+            >>> # Returns an updated user object with 'name' set to "Alice" 
and 'age' set to 30
+
+        The result type is the same structured type as the input, with the 
specified
+        fields updated to their new values.
+
+        :param kv: key-value pairs where even-indexed elements are field names
+                   (strings) and odd-indexed elements are the new values for 
those
+                   fields
+        :return: expression representing the updated structured type with 
modified
+                 field values
+        """
+        gateway = get_gateway()
+        ApiExpressionUtils = (
+            gateway.jvm.org.apache.flink.table.expressions.ApiExpressionUtils
+        )
+        exprs = [
+            ApiExpressionUtils.objectToExpression(_get_java_expression(e)) for 
e in kv
+        ]

Review Comment:
   Is this pattern used somewhere else? why not `_varargs_op`?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java:
##########
@@ -2488,4 +2489,48 @@ public OutType percentile(InType percentage, InType 
frequency) {
                         objectToExpression(percentage),
                         objectToExpression(frequency)));
     }
+
+    /**
+     * Updates existing fields in a structured object by providing key-value 
pairs.
+     *
+     * <p>This function takes a structured object and updates specified fields 
with new values. The
+     * keys must be string literals that correspond to existing fields in the 
structured type. If a
+     * key does not exist in the input object, an exception will be thrown. If 
the value type is not
+     * compatible with the corresponding structured field type, an exception 
will also be thrown.
+     *
+     * <p>The function expects alternating key-value pairs where keys are 
field names (non-null
+     * strings) and values are the new values for those fields. At least one 
key-value pair must be
+     * provided.
+     *
+     * <p>Example usage:
+     *
+     * <pre>{@code
+     * // Create a structured object representing a user
+     * User userObject = objectOf("com.example.User", "name", "Bob", "age", 
25);
+     *
+     * // Update the 'name' field of a user object
+     * User updatedUser1 = userObject.objectUpdate("name", "Alice")
+     *
+     * // Update multiple fields
+     * User updatedUser2 = userObject.objectUpdate("name", "Alice", "age", 30)
+     *
+     * }</pre>
+     *
+     * <p>The result type is the same structured type as the input, with the 
specified fields
+     * updated to their new values.
+     *
+     * @param kv key-value pairs where even-indexed elements are field names 
(strings) and
+     *     odd-indexed elements are the new values for those fields
+     * @return expression representing a new structured object with updated 
field values
+     * @see 
org.apache.flink.table.functions.BuiltInFunctionDefinitions#OBJECT_UPDATE

Review Comment:
   remove this reference? rather internal
   ```suggestion
        * @see 
org.apache.flink.table.functions.BuiltInFunctionDefinitions#OBJECT_UPDATE
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ObjectUpdateTypeStrategy.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.DataTypes.Field;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.TypeStrategy;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Type strategy for the {@link BuiltInFunctionDefinitions#OBJECT_UPDATE} 
function.
+ *
+ * <p>This strategy infers the return type for the OBJECT_UPDATE function by:
+ *
+ * <ul>
+ *   <li>Extracting the field definitions from the input structured type
+ *   <li>Resolving the class from the structured type
+ *   <li>

Review Comment:
   ```suggestion
   
   ```



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/StructuredFunctionsITCase.java:
##########
@@ -197,6 +202,99 @@ private static Stream<TestSetSpec> objectOfTestCases() {
                                 "The first argument must be a non-nullable 
character string literal representing the class name."));
     }
 
+    private static Stream<TestSetSpec> objectUpdateTestCases() {
+        return Stream.of(
+                
TestSetSpec.forFunction(BuiltInFunctionDefinitions.OBJECT_UPDATE)
+                        .onFieldsWithData(42, "Bob")
+                        .andDataTypes(DataTypes.INT(), DataTypes.STRING())
+                        .withFunction(Type1.Type1Constructor.class)
+                        .withFunction(Type2.Type2Constructor.class)
+                        .withFunction(NestedType.NestedConstructor.class)
+                        // Test update all fields equality
+                        .testResult(
+                                call("Type1Constructor", $("f0"), $("f1"))
+                                        .objectUpdate("a", 16, "b", "Alice"),
+                                "OBJECT_UPDATE(OBJECT_OF('"
+                                        + Type1.class.getName()

Review Comment:
   add a test where type does not exist in classpath



##########
docs/data/sql_functions.yml:
##########
@@ -1219,6 +1219,35 @@ valueconstruction:
   - table: NUMERIC.rows
     description: Creates a NUMERIC interval of rows (commonly used in window 
creation).
 
+valuemodification:
+  - sql: OBJECT_UPDATE(object, key, value [, key, value , ...])
+    table: OBJECT.objectUpdate(key, value [, key, value , ...])
+    description: |
+      Updates existing fields in a structured object by providing key-value 
pairs.
+      
+      This function takes a structured object and updates specified fields 
with new values.
+      The keys must be string literals that correspond to existing fields in 
the structured type.
+      If a key does not exist in the input object, an exception will be thrown.
+      If the value type is not compatible with the corresponding structured 
field type, 
+      an exception will also be thrown.
+      
+      The function expects alternating key-value pairs where keys are field 
names (non-null strings) 
+      and values are the new values for those fields. At least one key-value 
pair must be provided.
+      The total number of arguments must be odd (object + pairs of key-value 
arguments).
+      
+      The result type is the same structured type as the input, with the 
specified fields
+      updated to their new values.
+      
+      ```sql
+      -- Update the 'name' field of a user object
+      OBJECT_UPDATE(OBJECT_OF('com.example.User', 'name', 'Bob', 'age', 14), 
'name', 'Alice')
+      -- Returns: User{name='Alice', age=14})

Review Comment:
   ```suggestion
         -- Returns: User{name='Alice', age=14}
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ObjectUpdateTypeStrategy.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.DataTypes.Field;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.TypeStrategy;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Type strategy for the {@link BuiltInFunctionDefinitions#OBJECT_UPDATE} 
function.
+ *
+ * <p>This strategy infers the return type for the OBJECT_UPDATE function by:
+ *
+ * <ul>
+ *   <li>Extracting the field definitions from the input structured type
+ *   <li>Resolving the class from the structured type
+ *   <li>
+ *   <li>Creating a new structured type with the same class but updating field 
types to match the
+ *       types of the new values being assigned
+ * </ul>
+ *
+ * <p>The return type preserves the original class and field names but may 
have different field
+ * types than the input structured type, as field types are updated to match 
the types of the values
+ * being assigned during the update operation.
+ */
+@Internal
+public class ObjectUpdateTypeStrategy implements TypeStrategy {
+
+    private static Class<?> extractClass(
+            final CallContext callContext, final StructuredType 
structuredType) {
+        final DataTypeFactory dataTypeFactory = 
callContext.getDataTypeFactory();
+        final ClassLoader classLoader = dataTypeFactory.getClassLoader();
+        String className = 
structuredType.getClassName().orElseThrow(IllegalStateException::new);
+
+        return StructuredType.resolveClass(classLoader, className)
+                .orElseThrow(IllegalStateException::new);

Review Comment:
   ```suggestion
                   .orElseThrow(IllegalStateException::new);
   ```
   class is optional



##########
docs/data/sql_functions.yml:
##########
@@ -1219,6 +1219,35 @@ valueconstruction:
   - table: NUMERIC.rows
     description: Creates a NUMERIC interval of rows (commonly used in window 
creation).
 
+valuemodification:
+  - sql: OBJECT_UPDATE(object, key, value [, key, value , ...])
+    table: OBJECT.objectUpdate(key, value [, key, value , ...])
+    description: |
+      Updates existing fields in a structured object by providing key-value 
pairs.
+      
+      This function takes a structured object and updates specified fields 
with new values.
+      The keys must be string literals that correspond to existing fields in 
the structured type.
+      If a key does not exist in the input object, an exception will be thrown.
+      If the value type is not compatible with the corresponding structured 
field type, 
+      an exception will also be thrown.
+      
+      The function expects alternating key-value pairs where keys are field 
names (non-null strings) 
+      and values are the new values for those fields. At least one key-value 
pair must be provided.
+      The total number of arguments must be odd (object + pairs of key-value 
arguments).
+      
+      The result type is the same structured type as the input, with the 
specified fields
+      updated to their new values.
+      
+      ```sql
+      -- Update the 'name' field of a user object
+      OBJECT_UPDATE(OBJECT_OF('com.example.User', 'name', 'Bob', 'age', 14), 
'name', 'Alice')
+      -- Returns: User{name='Alice', age=14})
+      
+      -- Update multiple fields
+      OBJECT_UPDATE(OBJECT_OF('com.example.User', 'name', 'Bob', 'age', 14), 
'name', 'Alice', 'age', 30)
+      -- Returns: User{name='Alice', age=30})

Review Comment:
   ```suggestion
         -- Returns: User{name='Alice', age=30}
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ObjectUpdateInputTypeStrategy.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.inference.Signature.Argument;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Input type strategy for the {@link 
BuiltInFunctionDefinitions#OBJECT_UPDATE} function.
+ *
+ * <p>This strategy validates the input arguments for updating existing fields 
in a structured type:
+ *
+ * <ul>
+ *   <li>Ensures the function has an odd number of arguments (at least 3)
+ *   <li>Validates the first argument is a non-null structured type
+ *   <li>Validates that key arguments are non-null string literals
+ *   <li>Ensures field names are not repeated in the key-value pairs
+ *   <li>Ensures field names are part of the structured type's attributes
+ *   <li>Ensures field values match the expected types defined in the 
structured type
+ * </ul>
+ *
+ * <p>The expected signature is: {@code OBJECT_UPDATE(object, key1, value1, 
key2, value2, ...)}
+ */
+@Internal
+public class ObjectUpdateInputTypeStrategy implements InputTypeStrategy {
+
+    private static final ArgumentCount AT_LEAST_THREE_AND_ODD =
+            new ArgumentCount() {
+                @Override
+                public boolean isValidCount(final int count) {
+                    return count % 2 == 1;
+                }
+
+                @Override
+                public Optional<Integer> getMinCount() {
+                    return Optional.of(3);
+                }
+
+                @Override
+                public Optional<Integer> getMaxCount() {
+                    return Optional.empty();
+                }
+            };
+
+    private static StructuredType validateObjectArgument(final DataType 
firstArgumentType) {
+        final LogicalType firstArgumentLogicalType = 
firstArgumentType.getLogicalType();
+        if (!firstArgumentLogicalType.is(LogicalTypeRoot.STRUCTURED_TYPE)) {
+            throw new ValidationException(
+                    String.format(
+                            "The first argument must be a structured type, but 
was %s.",
+                            firstArgumentLogicalType));
+        }
+        return (StructuredType) firstArgumentLogicalType;
+    }
+
+    private static void validateKeyValueArguments(
+            final CallContext callContext,
+            final List<DataType> argumentDataTypes,
+            final StructuredType structuredType) {
+        final Set<String> fieldNames = new HashSet<>();
+        final Map<String, LogicalType> 
structuredTypeAttributeNameToLogicalType =
+                structuredType.getAttributes().stream()
+                        .collect(
+                                Collectors.toMap(
+                                        StructuredAttribute::getName,
+                                        StructuredAttribute::getType));
+
+        for (int i = 1; i < argumentDataTypes.size(); i += 2) {
+            final String keyName =
+                    validateFieldNameArgument(
+                            callContext,
+                            argumentDataTypes,
+                            i,
+                            structuredTypeAttributeNameToLogicalType,
+                            fieldNames);
+
+            validateValueArgument(
+                    argumentDataTypes, i + 1, 
structuredTypeAttributeNameToLogicalType, keyName);
+        }
+    }
+
+    private static String validateFieldNameArgument(
+            final CallContext callContext,
+            final List<DataType> argumentDataTypes,
+            final int pos,
+            final Map<String, LogicalType> attributes,
+            final Set<String> fieldNames) {
+        final LogicalType fieldNameLogicalType = 
argumentDataTypes.get(pos).getLogicalType();
+        if (!fieldNameLogicalType.is(LogicalTypeFamily.CHARACTER_STRING)) {
+            final String message =
+                    String.format(
+                            "The field key at position %d must be a non-null 
character string, but was %s.",
+                            pos + 1, fieldNameLogicalType.asSummaryString());
+            throw new ValidationException(message);
+        }
+
+        final String fieldName =
+                callContext
+                        .getArgumentValue(pos, String.class)
+                        .orElseThrow(
+                                () -> {
+                                    final String message =
+                                            String.format(
+                                                    "The field key at position 
%d must be a non-null character string literal.",
+                                                    pos + 1);
+                                    return new ValidationException(message);
+                                });
+
+        // validate that the field name is not repeated
+        if (!fieldNames.add(fieldName)) {
+            final String message =
+                    String.format(
+                            "The field name '%s' at position %d is repeated.", 
fieldName, pos + 1);
+            throw new ValidationException(message);
+        }
+
+        // validate that the field name is part of the structured type 
attributes
+        if (!attributes.containsKey(fieldName)) {
+            final String message =
+                    String.format(
+                            "The field name '%s' at position %d is not part of 
the structured type attributes. Available attributes: %s.",
+                            fieldName, pos + 1, attributes.keySet());
+            throw new ValidationException(message);
+        }
+
+        return fieldName;
+    }
+
+    private static void validateValueArgument(
+            final List<DataType> argumentDataTypes,
+            final int pos,
+            final Map<String, LogicalType> structuredTypeAttributes,
+            final String keyNameToBeUpdated) {
+        final DataType argumentValueDataType = argumentDataTypes.get(pos);
+        final LogicalType argumentValueLogicalType = 
argumentValueDataType.getLogicalType();
+
+        final LogicalType expectedType = 
structuredTypeAttributes.get(keyNameToBeUpdated);
+        final DataType expectedDataType = DataTypes.of(expectedType);
+
+        // Validate that the updated value type matches the expected type
+        if (!argumentValueLogicalType

Review Comment:
   This check can be purely logical. So `LogicalTypeCasts#supportsImplicitCast` 
could be an option. But I'm wondering if we should remove this validation 
entirely. Otherwise a CHAR(3) could not be stored in a CHAR(2). Any mutation 
should be allowed and needs to be checked by a different entity (e.g. during 
equals).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to