This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4cd43fc09bd [FLINK-34058][table] Support optional parameters for named 
arguments (#24183)
4cd43fc09bd is described below

commit 4cd43fc09bd6c2e4806792fa2cce71f54ec1a9dd
Author: Feng Jin <[email protected]>
AuthorDate: Mon Jan 29 23:40:07 2024 +0800

    [FLINK-34058][table] Support optional parameters for named arguments 
(#24183)
    
    
    ---------
    
    Co-authored-by: xuyang <[email protected]>
    Co-authored-by: Shengkai <[email protected]>
---
 .../types/extraction/BaseMappingExtractor.java     |  51 ++-
 .../extraction/FunctionSignatureTemplate.java      |  30 +-
 .../table/types/extraction/FunctionTemplate.java   |  15 +-
 .../types/extraction/TypeInferenceExtractor.java   |  22 +-
 .../flink/table/types/inference/TypeInference.java |  26 ++
 .../extraction/TypeInferenceExtractorTest.java     | 347 ++++++++++++++++++++-
 .../apache/calcite/sql2rel/SqlToRelConverter.java  |  21 +-
 .../planner/calcite/FlinkConvertletTable.java      |  39 +++
 .../planner/calcite/FlinkOperatorBinding.java      | 181 +++++++++++
 .../inference/CallBindingCallContext.java          |  21 +-
 .../inference/OperatorBindingCallContext.java      |  31 +-
 .../inference/TypeInferenceOperandChecker.java     |  41 ++-
 .../functions/sql/FlinkSqlOperatorTable.java       |   6 +
 .../operations/PlannerCallProcedureOperation.java  |   4 +-
 .../converters/SqlProcedureCallConverter.java      |  36 ++-
 .../planner/codegen/calls/StringCallGen.scala      |   5 +-
 .../factories/TestProcedureCatalogFactory.java     |  18 ++
 .../planner/runtime/stream/sql/FunctionITCase.java | 140 ++++++++-
 .../runtime/stream/sql/ProcedureITCase.java        |  20 +-
 19 files changed, 976 insertions(+), 78 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/BaseMappingExtractor.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/BaseMappingExtractor.java
index fdd12e1d795..6069a0f729c 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/BaseMappingExtractor.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/BaseMappingExtractor.java
@@ -34,6 +34,7 @@ import java.lang.reflect.Method;
 import java.lang.reflect.Parameter;
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
+import java.util.Arrays;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -143,6 +144,9 @@ abstract class BaseMappingExtractor {
                 // check if the method can be called
                 verifyMappingForMethod(correctMethod, 
collectedMappingsPerMethod, verification);
 
+                // check if we declare optional on a primitive type parameter
+                verifyOptionalOnPrimitiveParameter(correctMethod, 
collectedMappingsPerMethod);
+
                 // check if method strategies conflict with function strategies
                 collectedMappingsPerMethod.forEach(
                         (signature, result) -> putMapping(collectedMappings, 
signature, result));
@@ -323,6 +327,40 @@ abstract class BaseMappingExtractor {
                         verification.verify(method, signature.toClass(), 
result.toClass()));
     }
 
+    private void verifyOptionalOnPrimitiveParameter(
+            Method method,
+            Map<FunctionSignatureTemplate, FunctionResultTemplate> 
collectedMappingsPerMethod) {
+        collectedMappingsPerMethod
+                .keySet()
+                .forEach(
+                        signature -> {
+                            Boolean[] argumentOptional = 
signature.argumentOptionals;
+                            // Here we restrict that the argument must contain 
optional parameters
+                            // in order to obtain the 
FunctionSignatureTemplate of the method for
+                            // verification. Therefore, the extract method 
will only be called once.
+                            // If no function hint is set, this verify will 
not be executed.
+                            if (argumentOptional != null
+                                    && Arrays.stream(argumentOptional)
+                                            .anyMatch(Boolean::booleanValue)) {
+                                FunctionSignatureTemplate 
functionResultTemplate =
+                                        signatureExtraction.extract(this, 
method);
+                                for (int i = 0; i < argumentOptional.length; 
i++) {
+                                    DataType dataType =
+                                            
functionResultTemplate.argumentTemplates.get(i)
+                                                    .dataType;
+                                    if (dataType != null
+                                            && argumentOptional[i]
+                                            && dataType.getConversionClass() 
!= null
+                                            && 
dataType.getConversionClass().isPrimitive()) {
+                                        throw extractionError(
+                                                "Argument at position %d is 
optional but a primitive type doesn't accept null value.",
+                                                i);
+                                    }
+                                }
+                            }
+                        });
+    }
+
     // 
--------------------------------------------------------------------------------------------
     // Context sensitive extraction and verification logic
     // 
--------------------------------------------------------------------------------------------
@@ -338,7 +376,10 @@ abstract class BaseMappingExtractor {
 
             final String[] argumentNames = extractArgumentNames(method, 
offset);
 
-            return FunctionSignatureTemplate.of(parameterTypes, 
method.isVarArgs(), argumentNames);
+            final Boolean[] argumentOptionals = 
extractArgumentOptionals(method, offset);
+
+            return FunctionSignatureTemplate.of(
+                    parameterTypes, method.isVarArgs(), argumentNames, 
argumentOptionals);
         };
     }
 
@@ -416,6 +457,14 @@ abstract class BaseMappingExtractor {
         }
     }
 
+    static Boolean[] extractArgumentOptionals(Method method, int offset) {
+        return Arrays.stream(method.getParameters())
+                .skip(offset)
+                .map(parameter -> parameter.getAnnotation(ArgumentHint.class))
+                .map(argumentHint -> argumentHint != null && 
argumentHint.isOptional())
+                .toArray(Boolean[]::new);
+    }
+
     protected static ValidationException createMethodNotFoundError(
             String methodName, Class<?>[] parameters, @Nullable Class<?> 
returnType) {
         return extractionError(
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionSignatureTemplate.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionSignatureTemplate.java
index 2efbbd0e8f6..855f11e6fa8 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionSignatureTemplate.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionSignatureTemplate.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.types.extraction;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
 import org.apache.flink.table.types.inference.InputTypeStrategies;
 import org.apache.flink.table.types.inference.InputTypeStrategy;
@@ -44,19 +45,24 @@ final class FunctionSignatureTemplate {
 
     final @Nullable String[] argumentNames;
 
+    final Boolean[] argumentOptionals;
+
     private FunctionSignatureTemplate(
             List<FunctionArgumentTemplate> argumentTemplates,
             boolean isVarArgs,
-            @Nullable String[] argumentNames) {
+            @Nullable String[] argumentNames,
+            Boolean[] argumentOptionals) {
         this.argumentTemplates = argumentTemplates;
         this.isVarArgs = isVarArgs;
         this.argumentNames = argumentNames;
+        this.argumentOptionals = argumentOptionals;
     }
 
     static FunctionSignatureTemplate of(
             List<FunctionArgumentTemplate> argumentTemplates,
             boolean isVarArgs,
-            @Nullable String[] argumentNames) {
+            @Nullable String[] argumentNames,
+            Boolean[] argumentOptionals) {
         if (argumentNames != null && argumentNames.length != 
argumentTemplates.size()) {
             throw extractionError(
                     "Mismatch between number of argument names '%s' and 
argument types '%s'.",
@@ -67,7 +73,25 @@ final class FunctionSignatureTemplate {
             throw extractionError(
                     "Argument name conflict, there are at least two argument 
names that are the same.");
         }
-        return new FunctionSignatureTemplate(argumentTemplates, isVarArgs, 
argumentNames);
+        if (argumentOptionals != null && argumentOptionals.length != 
argumentTemplates.size()) {
+            throw extractionError(
+                    "Mismatch between number of argument optionals '%s' and 
argument types '%s'.",
+                    argumentOptionals.length, argumentTemplates.size());
+        }
+        if (argumentOptionals != null) {
+            for (int i = 0; i < argumentTemplates.size(); i++) {
+                DataType dataType = argumentTemplates.get(i).dataType;
+                if (dataType != null
+                        && !dataType.getLogicalType().isNullable()
+                        && argumentOptionals[i]) {
+                    throw extractionError(
+                            "Argument at position %s is optional but its type 
doesn't accept null value.",
+                            i);
+                }
+            }
+        }
+        return new FunctionSignatureTemplate(
+                argumentTemplates, isVarArgs, argumentNames, 
argumentOptionals);
     }
 
     InputTypeStrategy toInputTypeStrategy() {
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionTemplate.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionTemplate.java
index 80e84b9137a..e18f5991477 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionTemplate.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionTemplate.java
@@ -194,14 +194,17 @@ final class FunctionTemplate {
                     "Argument and input hints cannot be declared in the same 
function hint.");
         }
 
+        Boolean[] argumentOptionals = null;
         if (argumentHints != null) {
             argumentHintNames = new String[argumentHints.length];
             argumentHintTypes = new DataTypeHint[argumentHints.length];
+            argumentOptionals = new Boolean[argumentHints.length];
             boolean allArgumentNameNotSet = true;
             for (int i = 0; i < argumentHints.length; i++) {
                 ArgumentHint argumentHint = argumentHints[i];
                 argumentHintNames[i] = defaultAsNull(argumentHint, 
ArgumentHint::name);
                 argumentHintTypes[i] = defaultAsNull(argumentHint, 
ArgumentHint::type);
+                argumentOptionals[i] = argumentHint.isOptional();
                 if (argumentHintTypes[i] == null) {
                     throw extractionError("The type of the argument at 
position %d is not set.", i);
                 }
@@ -216,12 +219,13 @@ final class FunctionTemplate {
                 argumentHintNames = null;
             }
         } else {
+            if (inputs == null) {
+                return null;
+            }
             argumentHintTypes = inputs;
             argumentHintNames = argumentNames;
-        }
-
-        if (argumentHintTypes == null) {
-            return null;
+            argumentOptionals = new Boolean[inputs.length];
+            Arrays.fill(argumentOptionals, false);
         }
 
         return FunctionSignatureTemplate.of(
@@ -229,7 +233,8 @@ final class FunctionTemplate {
                         .map(dataTypeHint -> 
createArgumentTemplate(typeFactory, dataTypeHint))
                         .collect(Collectors.toList()),
                 isVarArg,
-                argumentHintNames);
+                argumentHintNames,
+                argumentOptionals);
     }
 
     private static FunctionArgumentTemplate createArgumentTemplate(
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/TypeInferenceExtractor.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/TypeInferenceExtractor.java
index b817efa2b84..1b54abd5574 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/TypeInferenceExtractor.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/TypeInferenceExtractor.java
@@ -229,6 +229,7 @@ public final class TypeInferenceExtractor {
         final TypeInference.Builder builder = TypeInference.newBuilder();
 
         configureNamedArguments(builder, outputMapping);
+        configureOptionalArguments(builder, outputMapping);
         configureTypedArguments(builder, outputMapping);
 
         builder.inputTypeStrategy(translateInputTypeStrategy(outputMapping));
@@ -268,6 +269,24 @@ public final class TypeInferenceExtractor {
         builder.namedArguments(argumentNames.iterator().next());
     }
 
+    private static void configureOptionalArguments(
+            TypeInference.Builder builder,
+            Map<FunctionSignatureTemplate, FunctionResultTemplate> 
outputMapping) {
+        final Set<FunctionSignatureTemplate> signatures = 
outputMapping.keySet();
+        if (signatures.stream().anyMatch(s -> s.isVarArgs || s.argumentNames 
== null)) {
+            return;
+        }
+        final List<List<Boolean>> argumentOptional =
+                signatures.stream()
+                        .filter(s -> s.argumentOptionals != null)
+                        .map(s -> Arrays.asList(s.argumentOptionals))
+                        .collect(Collectors.toList());
+        if (argumentOptional.size() != 1 || argumentOptional.size() != 
signatures.size()) {
+            return;
+        }
+        builder.optionalArguments(argumentOptional.get(0));
+    }
+
     private static void configureTypedArguments(
             TypeInference.Builder builder,
             Map<FunctionSignatureTemplate, FunctionResultTemplate> 
outputMapping) {
@@ -291,7 +310,8 @@ public final class TypeInferenceExtractor {
                         .collect(
                                 Collectors.toMap(
                                         e -> e.getKey().toInputTypeStrategy(),
-                                        e -> e.getValue().toTypeStrategy()));
+                                        e -> e.getValue().toTypeStrategy(),
+                                        (t1, t2) -> t2));
         return TypeStrategies.mapping(mappings);
     }
 
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInference.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInference.java
index 921282b1256..e7bdaf77a93 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInference.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInference.java
@@ -46,6 +46,8 @@ public final class TypeInference {
 
     private final @Nullable List<String> namedArguments;
 
+    private final @Nullable List<Boolean> optionalArguments;
+
     private final @Nullable List<DataType> typedArguments;
 
     private final InputTypeStrategy inputTypeStrategy;
@@ -56,11 +58,13 @@ public final class TypeInference {
 
     private TypeInference(
             @Nullable List<String> namedArguments,
+            @Nullable List<Boolean> optionalArguments,
             @Nullable List<DataType> typedArguments,
             InputTypeStrategy inputTypeStrategy,
             @Nullable TypeStrategy accumulatorTypeStrategy,
             TypeStrategy outputTypeStrategy) {
         this.namedArguments = namedArguments;
+        this.optionalArguments = optionalArguments;
         this.typedArguments = typedArguments;
         this.inputTypeStrategy = inputTypeStrategy;
         this.accumulatorTypeStrategy = accumulatorTypeStrategy;
@@ -88,6 +92,10 @@ public final class TypeInference {
         return Optional.ofNullable(typedArguments);
     }
 
+    public Optional<List<Boolean>> getOptionalArguments() {
+        return Optional.ofNullable(optionalArguments);
+    }
+
     public InputTypeStrategy getInputTypeStrategy() {
         return inputTypeStrategy;
     }
@@ -108,6 +116,8 @@ public final class TypeInference {
 
         private @Nullable List<String> namedArguments;
 
+        private @Nullable List<Boolean> optionalArguments;
+
         private @Nullable List<DataType> typedArguments;
 
         private InputTypeStrategy inputTypeStrategy = 
InputTypeStrategies.WILDCARD;
@@ -140,6 +150,21 @@ public final class TypeInference {
             return namedArguments(Arrays.asList(argumentNames));
         }
 
+        /**
+         * Sets the list of argument optionals for specifying optional 
arguments in the input
+         * signature explicitly.
+         *
+         * <p>This information is useful for SQL's concept of named arguments 
using the assignment
+         * operator. The optionals are used to determine whether an argument 
is optional or required
+         * in the function call.
+         */
+        public Builder optionalArguments(List<Boolean> optionalArguments) {
+            this.optionalArguments =
+                    Preconditions.checkNotNull(
+                            optionalArguments, "List of argument optionals 
must not be null.");
+            return this;
+        }
+
         /**
          * Sets the list of argument types for specifying a fixed, not 
overloaded, not vararg input
          * signature explicitly.
@@ -198,6 +223,7 @@ public final class TypeInference {
         public TypeInference build() {
             return new TypeInference(
                     namedArguments,
+                    optionalArguments,
                     typedArguments,
                     inputTypeStrategy,
                     accumulatorTypeStrategy,
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/TypeInferenceExtractorTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/TypeInferenceExtractorTest.java
index 483e156636e..dc3b36c9320 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/TypeInferenceExtractorTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/TypeInferenceExtractorTest.java
@@ -573,6 +573,7 @@ class TypeInferenceExtractorTest {
                                 ArgumentHintOnParameterScalarFunction.class)
                         .expectNamedArguments("in1", "in2")
                         .expectTypedArguments(DataTypes.STRING(), 
DataTypes.INT())
+                        .expectOptionalArguments(false, false)
                         .expectOutputMapping(
                                 InputTypeStrategies.sequence(
                                         new String[] {"in1", "in2"},
@@ -600,7 +601,8 @@ class TypeInferenceExtractorTest {
                                 "A valid scalar class that declare 
FunctionHint for both class and method in the same class.",
                                 ValidFunctionHintOnClassAndMethod.class)
                         .expectNamedArguments("f1", "f2")
-                        .expectTypedArguments(DataTypes.STRING(), 
DataTypes.INT()),
+                        .expectTypedArguments(DataTypes.STRING(), 
DataTypes.INT())
+                        .expectOptionalArguments(true, true),
                 TestSpec.forScalarFunction(
                                 "The FunctionHint of the function conflicts 
with the method.",
                                 
ScalarFunctionWithFunctionHintConflictMethod.class)
@@ -609,7 +611,23 @@ class TypeInferenceExtractorTest {
                 // For function with overloaded function, argument name will 
be empty
                 TestSpec.forScalarFunction(
                         "Scalar function with overloaded functions and 
arguments hint declared.",
-                        
ArgumentsHintScalarFunctionWithOverloadedFunction.class));
+                        
ArgumentsHintScalarFunctionWithOverloadedFunction.class),
+                TestSpec.forScalarFunction(
+                                "Scalar function with argument type not null 
but optional.",
+                                
ArgumentHintNotNullTypeWithOptionalsScalarFunction.class)
+                        .expectErrorMessage(
+                                "Argument at position 0 is optional but its 
type doesn't accept null value."),
+                TestSpec.forScalarFunction(
+                                "Scalar function with arguments hint and 
variable length args",
+                                ArgumentHintVariableLengthScalarFunction.class)
+                        .expectOutputMapping(
+                                InputTypeStrategies.varyingSequence(
+                                        new String[] {"f1", "f2"},
+                                        new ArgumentTypeStrategy[] {
+                                            
InputTypeStrategies.explicit(DataTypes.STRING()),
+                                            
InputTypeStrategies.explicit(DataTypes.INT())
+                                        }),
+                                TypeStrategies.explicit(DataTypes.STRING())));
     }
 
     private static Stream<TestSpec> procedureSpecs() {
@@ -866,7 +884,114 @@ class TypeInferenceExtractorTest {
                 // no implementation
                 TestSpec.forProcedure(MissingMethodProcedure.class)
                         .expectErrorMessage(
-                                "Could not find a publicly accessible method 
named 'call'."));
+                                "Could not find a publicly accessible method 
named 'call'."),
+                TestSpec.forProcedure(
+                                "Named arguments procedure with argument hint 
on method",
+                                ArgumentHintOnMethodProcedure.class)
+                        .expectNamedArguments("f1", "f2")
+                        .expectTypedArguments(DataTypes.STRING(), 
DataTypes.INT())
+                        .expectOptionalArguments(true, true)
+                        .expectOutputMapping(
+                                InputTypeStrategies.sequence(
+                                        new String[] {"f1", "f2"},
+                                        new ArgumentTypeStrategy[] {
+                                            
InputTypeStrategies.explicit(DataTypes.STRING()),
+                                            
InputTypeStrategies.explicit(DataTypes.INT())
+                                        }),
+                                TypeStrategies.explicit(
+                                        
DataTypes.INT().notNull().bridgedTo(int.class))),
+                TestSpec.forProcedure(
+                                "Named arguments procedure with argument hint 
on class",
+                                ArgumentHintOnClassProcedure.class)
+                        .expectNamedArguments("f1", "f2")
+                        .expectTypedArguments(DataTypes.STRING(), 
DataTypes.INT())
+                        .expectOptionalArguments(true, true)
+                        .expectOutputMapping(
+                                InputTypeStrategies.sequence(
+                                        new String[] {"f1", "f2"},
+                                        new ArgumentTypeStrategy[] {
+                                            
InputTypeStrategies.explicit(DataTypes.STRING()),
+                                            
InputTypeStrategies.explicit(DataTypes.INT())
+                                        }),
+                                TypeStrategies.explicit(
+                                        
DataTypes.INT().notNull().bridgedTo(int.class))),
+                TestSpec.forProcedure(
+                                "Named arguments procedure with argument hint 
on parameter",
+                                ArgumentHintOnParameterProcedure.class)
+                        .expectNamedArguments("parameter_f1", "parameter_f2")
+                        .expectTypedArguments(
+                                DataTypes.STRING(), 
DataTypes.INT().bridgedTo(int.class))
+                        .expectOptionalArguments(true, false)
+                        .expectOutputMapping(
+                                InputTypeStrategies.sequence(
+                                        new String[] {"parameter_f1", 
"parameter_f2"},
+                                        new ArgumentTypeStrategy[] {
+                                            
InputTypeStrategies.explicit(DataTypes.STRING()),
+                                            InputTypeStrategies.explicit(
+                                                    
DataTypes.INT().bridgedTo(int.class))
+                                        }),
+                                TypeStrategies.explicit(
+                                        
DataTypes.INT().notNull().bridgedTo(int.class))),
+                TestSpec.forProcedure(
+                                "Named arguments procedure with argument hint 
on method and parameter",
+                                
ArgumentHintOnMethodAndParameterProcedure.class)
+                        .expectNamedArguments("local_f1", "local_f2")
+                        .expectTypedArguments(DataTypes.STRING(), 
DataTypes.INT())
+                        .expectOptionalArguments(true, true)
+                        .expectOutputMapping(
+                                InputTypeStrategies.sequence(
+                                        new String[] {"local_f1", "local_f2"},
+                                        new ArgumentTypeStrategy[] {
+                                            
InputTypeStrategies.explicit(DataTypes.STRING()),
+                                            
InputTypeStrategies.explicit(DataTypes.INT())
+                                        }),
+                                TypeStrategies.explicit(
+                                        
DataTypes.INT().notNull().bridgedTo(int.class))),
+                TestSpec.forProcedure(
+                                "Named arguments procedure with argument hint 
on class and method",
+                                ArgumentHintOnClassAndMethodProcedure.class)
+                        .expectNamedArguments("global_f1", "global_f2")
+                        .expectTypedArguments(DataTypes.STRING(), 
DataTypes.INT())
+                        .expectOptionalArguments(false, false)
+                        .expectOutputMapping(
+                                InputTypeStrategies.sequence(
+                                        new String[] {"global_f1", 
"global_f2"},
+                                        new ArgumentTypeStrategy[] {
+                                            
InputTypeStrategies.explicit(DataTypes.STRING()),
+                                            
InputTypeStrategies.explicit(DataTypes.INT())
+                                        }),
+                                TypeStrategies.explicit(
+                                        
DataTypes.INT().notNull().bridgedTo(int.class))),
+                TestSpec.forProcedure(
+                                "Named arguments procedure with argument hint 
on class and method and parameter",
+                                
ArgumentHintOnClassAndMethodAndParameterProcedure.class)
+                        .expectNamedArguments("global_f1", "global_f2")
+                        .expectTypedArguments(DataTypes.STRING(), 
DataTypes.INT())
+                        .expectOptionalArguments(false, false)
+                        .expectOutputMapping(
+                                InputTypeStrategies.sequence(
+                                        new String[] {"global_f1", 
"global_f2"},
+                                        new ArgumentTypeStrategy[] {
+                                            
InputTypeStrategies.explicit(DataTypes.STRING()),
+                                            
InputTypeStrategies.explicit(DataTypes.INT())
+                                        }),
+                                TypeStrategies.explicit(
+                                        
DataTypes.INT().notNull().bridgedTo(int.class))),
+                TestSpec.forProcedure(
+                                "Named arguments procedure with argument hint 
type not null but optional",
+                                ArgumentHintNotNullWithOptionalProcedure.class)
+                        .expectErrorMessage(
+                                "Argument at position 1 is optional but its 
type doesn't accept null value."),
+                TestSpec.forProcedure(
+                                "Named arguments procedure with argument name 
conflict",
+                                ArgumentHintNameConflictProcedure.class)
+                        .expectErrorMessage(
+                                "Argument name conflict, there are at least 
two argument names that are the same."),
+                TestSpec.forProcedure(
+                                "Named arguments procedure with optional type 
on primitive type",
+                                
ArgumentHintOptionalOnPrimitiveParameterConflictProcedure.class)
+                        .expectErrorMessage(
+                                "Argument at position 1 is optional but a 
primitive type doesn't accept null value."));
     }
 
     @ParameterizedTest(name = "{index}: {0}")
@@ -881,6 +1006,15 @@ class TypeInferenceExtractorTest {
         }
     }
 
+    @ParameterizedTest(name = "{index}: {0}")
+    @MethodSource("testData")
+    void testArgumentOptionals(TestSpec testSpec) {
+        if (testSpec.expectedArgumentOptionals != null) {
+            
assertThat(testSpec.typeInferenceExtraction.get().getOptionalArguments())
+                    
.isEqualTo(Optional.of(testSpec.expectedArgumentOptionals));
+        }
+    }
+
     @ParameterizedTest(name = "{index}: {0}")
     @MethodSource("testData")
     void testArgumentTypes(TestSpec testSpec) {
@@ -956,6 +1090,8 @@ class TypeInferenceExtractorTest {
 
         @Nullable List<String> expectedArgumentNames;
 
+        @Nullable List<Boolean> expectedArgumentOptionals;
+
         @Nullable List<DataType> expectedArgumentTypes;
 
         Map<InputTypeStrategy, TypeStrategy> expectedAccumulatorStrategies;
@@ -1045,6 +1181,11 @@ class TypeInferenceExtractorTest {
             return this;
         }
 
+        TestSpec expectOptionalArguments(Boolean... expectedArgumentOptionals) 
{
+            this.expectedArgumentOptionals = 
Arrays.asList(expectedArgumentOptionals);
+            return this;
+        }
+
         TestSpec expectTypedArguments(DataType... expectedArgumentTypes) {
             this.expectedArgumentTypes = Arrays.asList(expectedArgumentTypes);
             return this;
@@ -1625,6 +1766,176 @@ class TypeInferenceExtractorTest {
         }
     }
 
+    private static class ArgumentHintOnMethodProcedure implements Procedure {
+        @ProcedureHint(
+                argument = {
+                    @ArgumentHint(type = @DataTypeHint("STRING"), name = "f1", 
isOptional = true),
+                    @ArgumentHint(type = @DataTypeHint("INTEGER"), name = 
"f2", isOptional = true)
+                })
+        public int[] call(Object procedureContext, String f1, Integer f2) {
+            return null;
+        }
+    }
+
+    @ProcedureHint(
+            argument = {
+                @ArgumentHint(type = @DataTypeHint("STRING"), name = "f1", 
isOptional = true),
+                @ArgumentHint(type = @DataTypeHint("INTEGER"), name = "f2", 
isOptional = true)
+            })
+    private static class ArgumentHintOnClassProcedure implements Procedure {
+        public int[] call(Object procedureContext, String f1, Integer f2) {
+            return null;
+        }
+    }
+
+    private static class ArgumentHintOnParameterProcedure implements Procedure 
{
+        public int[] call(
+                Object procedureContext,
+                @ArgumentHint(
+                                type = @DataTypeHint("STRING"),
+                                name = "parameter_f1",
+                                isOptional = true)
+                        String f1,
+                @ArgumentHint(
+                                type = @DataTypeHint("INT"),
+                                name = "parameter_f2",
+                                isOptional = false)
+                        int f2) {
+            return null;
+        }
+    }
+
+    @ProcedureHint(
+            argument = {
+                @ArgumentHint(
+                        type = @DataTypeHint("STRING"),
+                        name = "global_f1",
+                        isOptional = false),
+                @ArgumentHint(
+                        type = @DataTypeHint("INTEGER"),
+                        name = "global_f2",
+                        isOptional = false)
+            })
+    private static class ArgumentHintOnClassAndMethodProcedure implements 
Procedure {
+        @ProcedureHint(
+                argument = {
+                    @ArgumentHint(
+                            type = @DataTypeHint("STRING"),
+                            name = "local_f1",
+                            isOptional = true),
+                    @ArgumentHint(
+                            type = @DataTypeHint("INTEGER"),
+                            name = "local_f2",
+                            isOptional = true)
+                })
+        public int[] call(Object procedureContext, String f1, Integer f2) {
+            return null;
+        }
+    }
+
+    private static class ArgumentHintOnMethodAndParameterProcedure implements 
Procedure {
+        @ProcedureHint(
+                argument = {
+                    @ArgumentHint(
+                            type = @DataTypeHint("STRING"),
+                            name = "local_f1",
+                            isOptional = true),
+                    @ArgumentHint(
+                            type = @DataTypeHint("INTEGER"),
+                            name = "local_f2",
+                            isOptional = true)
+                })
+        public int[] call(
+                Object procedureContext,
+                @ArgumentHint(
+                                type = @DataTypeHint("INTEGER"),
+                                name = "parameter_f1",
+                                isOptional = true)
+                        String f1,
+                @ArgumentHint(
+                                type = @DataTypeHint("INTEGER"),
+                                name = "parameter_f2",
+                                isOptional = false)
+                        Integer f2) {
+            return null;
+        }
+    }
+
+    @ProcedureHint(
+            argument = {
+                @ArgumentHint(
+                        type = @DataTypeHint("STRING"),
+                        name = "global_f1",
+                        isOptional = false),
+                @ArgumentHint(
+                        type = @DataTypeHint("INTEGER"),
+                        name = "global_f2",
+                        isOptional = false)
+            })
+    private static class ArgumentHintOnClassAndMethodAndParameterProcedure 
implements Procedure {
+        @ProcedureHint(
+                argument = {
+                    @ArgumentHint(
+                            type = @DataTypeHint("STRING"),
+                            name = "local_f1",
+                            isOptional = true),
+                    @ArgumentHint(
+                            type = @DataTypeHint("INTEGER"),
+                            name = "local_f2",
+                            isOptional = true)
+                })
+        public int[] call(
+                Object procedureContext,
+                @ArgumentHint(
+                                type = @DataTypeHint("STRING"),
+                                name = "parameter_f1",
+                                isOptional = false)
+                        String f1,
+                Integer f2) {
+            return null;
+        }
+    }
+
+    private static class ArgumentHintNotNullWithOptionalProcedure implements 
Procedure {
+        @ProcedureHint(
+                argument = {
+                    @ArgumentHint(type = @DataTypeHint("STRING"), name = "f1", 
isOptional = true),
+                    @ArgumentHint(
+                            type = @DataTypeHint("INTEGER NOT NULL"),
+                            name = "f2",
+                            isOptional = true)
+                })
+        public int[] call(Object procedureContext, String f1, Integer f2) {
+            return null;
+        }
+    }
+
+    private static class ArgumentHintNameConflictProcedure implements 
Procedure {
+        @ProcedureHint(
+                argument = {
+                    @ArgumentHint(type = @DataTypeHint("STRING"), name = "f1", 
isOptional = true),
+                    @ArgumentHint(
+                            type = @DataTypeHint("INTEGER NOT NULL"),
+                            name = "f1",
+                            isOptional = true)
+                })
+        public int[] call(Object procedureContext, String f1, Integer f2) {
+            return null;
+        }
+    }
+
+    private static class 
ArgumentHintOptionalOnPrimitiveParameterConflictProcedure
+            implements Procedure {
+        @ProcedureHint(
+                argument = {
+                    @ArgumentHint(type = @DataTypeHint("STRING"), name = "f1", 
isOptional = true),
+                    @ArgumentHint(type = @DataTypeHint("INTEGER"), name = 
"f2", isOptional = true)
+                })
+        public int[] call(Object procedureContext, String f1, int f2) {
+            return null;
+        }
+    }
+
     private static class ZeroArgFunctionAsync extends AsyncScalarFunction {
         public void eval(CompletableFuture<Integer> f) {}
     }
@@ -1761,8 +2072,8 @@ class TypeInferenceExtractorTest {
 
     @FunctionHint(
             argument = {
-                @ArgumentHint(type = @DataTypeHint("STRING"), name = "f1"),
-                @ArgumentHint(type = @DataTypeHint("INTEGER"), name = "f2")
+                @ArgumentHint(type = @DataTypeHint("STRING"), name = "f1", 
isOptional = true),
+                @ArgumentHint(type = @DataTypeHint("INTEGER"), name = "f2", 
isOptional = true)
             })
     private static class ValidFunctionHintOnClassAndMethod extends 
ScalarFunction {
         @FunctionHint(
@@ -1810,4 +2121,30 @@ class TypeInferenceExtractorTest {
             return "";
         }
     }
+
+    private static class ArgumentHintNotNullTypeWithOptionalsScalarFunction 
extends ScalarFunction {
+        @FunctionHint(
+                argument = {
+                    @ArgumentHint(
+                            type = @DataTypeHint("STRING NOT NULL"),
+                            name = "f1",
+                            isOptional = true),
+                    @ArgumentHint(type = @DataTypeHint("INTEGER"), name = 
"f2", isOptional = true)
+                })
+        public String eval(String f1, Integer f2) {
+            return "";
+        }
+    }
+
+    private static class ArgumentHintVariableLengthScalarFunction extends 
ScalarFunction {
+        @FunctionHint(
+                argument = {
+                    @ArgumentHint(type = @DataTypeHint("STRING"), name = "f1"),
+                    @ArgumentHint(type = @DataTypeHint("INTEGER"), name = "f2")
+                },
+                isVarArgs = true)
+        public String eval(String f1, Integer... f2) {
+            return "";
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
index bccced1342b..0c5d7dda150 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
@@ -18,6 +18,7 @@ package org.apache.calcite.sql2rel;
 
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.planner.calcite.FlinkOperatorBinding;
 import org.apache.flink.table.planner.calcite.TimestampSchemaVersion;
 import 
org.apache.flink.table.planner.hint.ClearQueryHintsWithInvalidPropagationShuttle;
 import org.apache.flink.table.planner.hint.FlinkHints;
@@ -241,7 +242,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  *   <li>Added in FLINK-32474: Lines 2875 ~ 2887
  *   <li>Added in FLINK-32474: Lines 2987 ~ 3021
  *   <li>Added in FLINK-20873: Lines 5519 ~ 5528
- *   <li>Added in FLINK-34057: Lines 6089 ~ 6092
+ *   <li>Added in FLINK-34057, FLINK-34058: Lines 6090 ~ 6116
  * </ol>
  */
 @SuppressWarnings("UnstableApiUsage")
@@ -6087,10 +6088,13 @@ public class SqlToRelConverter {
                 // switch out of agg mode
                 bb.agg = null;
                 // ----- FLINK MODIFICATION BEGIN -----
-                for (SqlNode operand :
-                        new SqlCallBinding(validator(), 
aggregatingSelectScope, call).operands())
-                // ----- FLINK MODIFICATION END -----
-                {
+                SqlCallBinding sqlCallBinding =
+                        new SqlCallBinding(validator(), 
aggregatingSelectScope, call);
+                List<SqlNode> sqlNodes = sqlCallBinding.operands();
+                FlinkOperatorBinding flinkOperatorBinding =
+                        new FlinkOperatorBinding(sqlCallBinding);
+                for (int i = 0; i < sqlNodes.size(); i++) {
+                    SqlNode operand = sqlNodes.get(i);
                     // special case for COUNT(*):  delete the *
                     if (operand instanceof SqlIdentifier) {
                         SqlIdentifier id = (SqlIdentifier) operand;
@@ -6101,8 +6105,15 @@ public class SqlToRelConverter {
                         }
                     }
                     RexNode convertedExpr = bb.convertExpression(operand);
+                    if (convertedExpr.getKind() == SqlKind.DEFAULT) {
+                        RelDataType relDataType = 
flinkOperatorBinding.getOperandType(i);
+                        convertedExpr =
+                                ((RexCall) convertedExpr)
+                                        .clone(relDataType, ((RexCall) 
convertedExpr).operands);
+                    }
                     args.add(lookupOrCreateGroupExpr(convertedExpr));
                 }
+                // ----- FLINK MODIFICATION END -----
 
                 if (filter != null) {
                     RexNode convertedExpr = bb.convertExpression(filter);
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkConvertletTable.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkConvertletTable.java
index 8f319dd1e23..0a22585ea5c 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkConvertletTable.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkConvertletTable.java
@@ -30,6 +30,7 @@ import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.SqlBasicCall;
 import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlCallBinding;
 import org.apache.calcite.sql.SqlDataTypeSpec;
 import org.apache.calcite.sql.SqlIntervalQualifier;
 import org.apache.calcite.sql.SqlKind;
@@ -43,8 +44,10 @@ import org.apache.calcite.sql2rel.SqlRexConvertlet;
 import org.apache.calcite.sql2rel.SqlRexConvertletTable;
 import org.apache.calcite.sql2rel.StandardConvertletTable;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 
@@ -69,6 +72,10 @@ public class FlinkConvertletTable implements 
SqlRexConvertletTable {
             return this::convertSetSemanticsWindowTableFunction;
         }
 
+        if (isContainsDefaultNode(call)) {
+            return this::convertSqlCallWithDefaultNode;
+        }
+
         return StandardConvertletTable.INSTANCE.get(call);
     }
 
@@ -113,6 +120,12 @@ public class FlinkConvertletTable implements 
SqlRexConvertletTable {
         return !operands.isEmpty() && operands.get(0).getKind() == 
SqlKind.SET_SEMANTICS_TABLE;
     }
 
+    private boolean isContainsDefaultNode(SqlCall call) {
+        return call.getOperandList().stream()
+                .filter(Objects::nonNull)
+                .anyMatch(operand -> operand.getKind() == SqlKind.DEFAULT);
+    }
+
     /**
      * Due to CALCITE-6204, we need to manually extract partition keys and 
order keys and convert
      * them to {@link RexSetSemanticsTableCall}.
@@ -202,4 +215,30 @@ public class FlinkConvertletTable implements 
SqlRexConvertletTable {
         // should not happen
         throw new TableException("Unsupported partition key with type: " + 
e.getKind());
     }
+
+    /**
+     * When the SqlCall contains a default operator, the type of the default 
node to ANY after
+     * converted to rel node. However, the ANY type cannot pass various checks 
well and cannot adapt
+     * well to types in flink. Therefore, we replace the ANY type with the 
argument type obtained
+     * from the operator.
+     */
+    private RexNode convertSqlCallWithDefaultNode(SqlRexContext cx, final 
SqlCall call) {
+        RexNode rexCall = StandardConvertletTable.INSTANCE.convertCall(cx, 
call);
+        SqlCallBinding sqlCallBinding = new SqlCallBinding(cx.getValidator(), 
null, call);
+        FlinkOperatorBinding flinkOperatorBinding = new 
FlinkOperatorBinding(sqlCallBinding);
+        if (rexCall instanceof RexCall) {
+            List<RexNode> operands = new ArrayList<>(((RexCall) 
rexCall).operands);
+            for (int i = 0; i < operands.size(); i++) {
+                RexNode rexNode = operands.get(i);
+                if (rexNode.getKind() == SqlKind.DEFAULT && rexNode instanceof 
RexCall) {
+                    RelDataType relDataType = 
flinkOperatorBinding.getOperandType(i);
+                    operands.set(
+                            i,
+                            ((RexCall) rexNode).clone(relDataType, ((RexCall) 
rexNode).operands));
+                }
+            }
+            return ((RexCall) rexCall).clone(rexCall.getType(), operands);
+        }
+        return rexCall;
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkOperatorBinding.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkOperatorBinding.java
new file mode 100644
index 00000000000..41e50d8739a
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkOperatorBinding.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.calcite;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexCallBinding;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.runtime.CalciteException;
+import org.apache.calcite.runtime.Resources;
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlOperatorBinding;
+import org.apache.calcite.sql.type.SqlOperandMetadata;
+import org.apache.calcite.sql.type.SqlOperandTypeChecker;
+import org.apache.calcite.sql.validate.SqlMonotonicity;
+import org.apache.calcite.sql.validate.SqlValidatorException;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * The proxy implementation of {@link SqlOperatorBinding} can be used to 
correct the operator type
+ * when using named parameter.
+ */
+public class FlinkOperatorBinding extends SqlOperatorBinding {
+
+    private final SqlOperatorBinding sqlOperatorBinding;
+
+    private final List<RelDataType> argumentTypes;
+
+    public FlinkOperatorBinding(SqlOperatorBinding sqlOperatorBinding) {
+        super(sqlOperatorBinding.getTypeFactory(), 
sqlOperatorBinding.getOperator());
+        this.sqlOperatorBinding = sqlOperatorBinding;
+        this.argumentTypes = getArgumentTypes();
+    }
+
+    @Override
+    public int getOperandCount() {
+        if (!argumentTypes.isEmpty()) {
+            return argumentTypes.size();
+        } else {
+            return sqlOperatorBinding.getOperandCount();
+        }
+    }
+
+    @Override
+    public RelDataType getOperandType(int ordinal) {
+        if (sqlOperatorBinding instanceof SqlCallBinding) {
+            SqlNode sqlNode = ((SqlCallBinding) 
sqlOperatorBinding).operands().get(ordinal);
+            if (sqlNode.getKind() == SqlKind.DEFAULT && 
!argumentTypes.isEmpty()) {
+                return argumentTypes.get(ordinal);
+            } else {
+                return ((SqlCallBinding) sqlOperatorBinding)
+                        .getValidator()
+                        .deriveType(((SqlCallBinding) 
sqlOperatorBinding).getScope(), sqlNode);
+            }
+        } else if (sqlOperatorBinding instanceof RexCallBinding) {
+            RexNode rexNode = ((RexCallBinding) 
sqlOperatorBinding).operands().get(ordinal);
+            if (rexNode.getKind() == SqlKind.DEFAULT && 
!argumentTypes.isEmpty()) {
+                return argumentTypes.get(ordinal);
+            } else {
+                return rexNode.getType();
+            }
+        }
+        return sqlOperatorBinding.getOperandType(ordinal);
+    }
+
+    @Override
+    public CalciteException newError(Resources.ExInst<SqlValidatorException> 
e) {
+        return sqlOperatorBinding.newError(e);
+    }
+
+    @Override
+    public int getGroupCount() {
+        return sqlOperatorBinding.getGroupCount();
+    }
+
+    @Override
+    public boolean hasFilter() {
+        return sqlOperatorBinding.hasFilter();
+    }
+
+    @Override
+    public SqlOperator getOperator() {
+        return sqlOperatorBinding.getOperator();
+    }
+
+    @Override
+    public RelDataTypeFactory getTypeFactory() {
+        return sqlOperatorBinding.getTypeFactory();
+    }
+
+    @Override
+    public @Nullable String getStringLiteralOperand(int ordinal) {
+        return sqlOperatorBinding.getStringLiteralOperand(ordinal);
+    }
+
+    @Override // to be removed before 2.0
+    public int getIntLiteralOperand(int ordinal) {
+        return sqlOperatorBinding.getIntLiteralOperand(ordinal);
+    }
+
+    @Override
+    public boolean isOperandNull(int ordinal, boolean allowCast) {
+        return sqlOperatorBinding.isOperandNull(ordinal, allowCast);
+    }
+
+    @Override
+    public boolean isOperandLiteral(int ordinal, boolean allowCast) {
+        return sqlOperatorBinding.isOperandLiteral(ordinal, allowCast);
+    }
+
+    @Override
+    public @Nullable Object getOperandLiteralValue(int ordinal, RelDataType 
type) {
+        return sqlOperatorBinding.getOperandLiteralValue(ordinal, type);
+    }
+
+    @Override
+    public @Nullable Comparable getOperandLiteralValue(int ordinal) {
+        return sqlOperatorBinding.getOperandLiteralValue(ordinal);
+    }
+
+    @Override
+    public SqlMonotonicity getOperandMonotonicity(int ordinal) {
+        return sqlOperatorBinding.getOperandMonotonicity(ordinal);
+    }
+
+    @Override
+    public List<RelDataType> collectOperandTypes() {
+        return sqlOperatorBinding.collectOperandTypes();
+    }
+
+    @Override
+    public @Nullable RelDataType getCursorOperand(int ordinal) {
+        return sqlOperatorBinding.getCursorOperand(ordinal);
+    }
+
+    @Override
+    public @Nullable String getColumnListParamInfo(
+            int ordinal, String paramName, List<String> columnList) {
+        return sqlOperatorBinding.getColumnListParamInfo(ordinal, paramName, 
columnList);
+    }
+
+    @Override
+    public <T extends Object> @Nullable T getOperandLiteralValue(int ordinal, 
Class<T> clazz) {
+        return sqlOperatorBinding.getOperandLiteralValue(ordinal, clazz);
+    }
+
+    private List<RelDataType> getArgumentTypes() {
+        SqlOperandTypeChecker sqlOperandTypeChecker = 
getOperator().getOperandTypeChecker();
+        if (sqlOperandTypeChecker != null
+                && sqlOperandTypeChecker.isFixedParameters()
+                && sqlOperandTypeChecker instanceof SqlOperandMetadata) {
+            SqlOperandMetadata sqlOperandMetadata =
+                    ((SqlOperandMetadata) 
getOperator().getOperandTypeChecker());
+            return sqlOperandMetadata.paramTypes(getTypeFactory());
+        } else {
+            return Collections.emptyList();
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java
index cd909ba79db..65aefccad2e 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.planner.functions.inference;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.catalog.DataTypeFactory;
 import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.planner.calcite.FlinkOperatorBinding;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.inference.CallContext;
@@ -31,6 +32,7 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.sql.SqlCallBinding;
 import org.apache.calcite.sql.SqlLiteral;
 import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperatorBinding;
 import org.apache.calcite.sql.SqlUtil;
 import org.apache.calcite.sql.type.SqlTypeName;
 
@@ -51,28 +53,29 @@ public final class CallBindingCallContext extends 
AbstractSqlCallContext {
 
     private final List<DataType> argumentDataTypes;
 
+    private final SqlOperatorBinding binding;
+
     private final @Nullable DataType outputType;
 
     public CallBindingCallContext(
             DataTypeFactory dataTypeFactory,
             FunctionDefinition definition,
-            SqlCallBinding binding,
+            SqlCallBinding sqlCallBinding,
             @Nullable RelDataType outputType) {
         super(
                 dataTypeFactory,
                 definition,
-                binding.getOperator().getNameAsId().toString(),
-                binding.getGroupCount() > 0);
+                sqlCallBinding.getOperator().getNameAsId().toString(),
+                sqlCallBinding.getGroupCount() > 0);
 
-        this.adaptedArguments = binding.operands(); // reorders the operands
+        this.adaptedArguments = sqlCallBinding.operands(); // reorders the 
operands
+        this.binding = new FlinkOperatorBinding(sqlCallBinding);
         this.argumentDataTypes =
                 new AbstractList<DataType>() {
                     @Override
                     public DataType get(int pos) {
-                        final RelDataType relDataType =
-                                binding.getValidator()
-                                        .deriveType(binding.getScope(), 
adaptedArguments.get(pos));
-                        final LogicalType logicalType = 
FlinkTypeFactory.toLogicalType(relDataType);
+                        final LogicalType logicalType =
+                                
FlinkTypeFactory.toLogicalType(binding.getOperandType(pos));
                         return 
TypeConversions.fromLogicalToDataType(logicalType);
                     }
 
@@ -81,7 +84,7 @@ public final class CallBindingCallContext extends 
AbstractSqlCallContext {
                         return binding.getOperandCount();
                     }
                 };
-        this.outputType = convertOutputType(binding, outputType);
+        this.outputType = convertOutputType(sqlCallBinding, outputType);
     }
 
     @Override
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java
index 103d37cc03e..9961354e304 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java
@@ -21,15 +21,13 @@ package org.apache.flink.table.planner.functions.inference;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.catalog.DataTypeFactory;
 import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.planner.calcite.FlinkOperatorBinding;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.inference.CallContext;
 import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.utils.TypeConversions;
 
 import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.sql.SqlCallBinding;
-import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlOperatorBinding;
 
 import javax.annotation.Nullable;
@@ -54,35 +52,22 @@ public final class OperatorBindingCallContext extends 
AbstractSqlCallContext {
     public OperatorBindingCallContext(
             DataTypeFactory dataTypeFactory,
             FunctionDefinition definition,
-            SqlOperatorBinding binding,
+            SqlOperatorBinding sqlOperatorBinding,
             RelDataType returnRelDataType) {
         super(
                 dataTypeFactory,
                 definition,
-                binding.getOperator().getNameAsId().toString(),
-                binding.getGroupCount() > 0);
+                sqlOperatorBinding.getOperator().getNameAsId().toString(),
+                sqlOperatorBinding.getGroupCount() > 0);
 
-        this.binding = binding;
+        this.binding = new FlinkOperatorBinding(sqlOperatorBinding);
         this.argumentDataTypes =
                 new AbstractList<DataType>() {
                     @Override
                     public DataType get(int pos) {
-                        if (binding instanceof SqlCallBinding) {
-                            SqlCallBinding sqlCallBinding = (SqlCallBinding) 
binding;
-                            List<SqlNode> operands = sqlCallBinding.operands();
-                            final RelDataType relDataType =
-                                    sqlCallBinding
-                                            .getValidator()
-                                            .deriveType(
-                                                    sqlCallBinding.getScope(), 
operands.get(pos));
-                            final LogicalType logicalType =
-                                    
FlinkTypeFactory.toLogicalType(relDataType);
-                            return 
TypeConversions.fromLogicalToDataType(logicalType);
-                        } else {
-                            final LogicalType logicalType =
-                                    toLogicalType(binding.getOperandType(pos));
-                            return fromLogicalToDataType(logicalType);
-                        }
+                        LogicalType logicalType =
+                                
FlinkTypeFactory.toLogicalType(binding.getOperandType(pos));
+                        return fromLogicalToDataType(logicalType);
                     }
 
                     @Override
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandChecker.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandChecker.java
index 5c122ca5038..3d3301d3c48 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandChecker.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandChecker.java
@@ -24,7 +24,9 @@ import org.apache.flink.table.catalog.DataTypeFactory;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
 import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.ConstantArgumentCount;
 import org.apache.flink.table.types.inference.TypeInference;
 import org.apache.flink.table.types.inference.TypeInferenceUtil;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -32,6 +34,7 @@ import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlOperandCountRange;
 import org.apache.calcite.sql.SqlOperator;
@@ -44,6 +47,7 @@ import org.apache.calcite.sql.validate.SqlValidator;
 import org.apache.calcite.sql.validate.SqlValidatorNamespace;
 
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 import static 
org.apache.flink.table.planner.calcite.FlinkTypeFactory.toLogicalType;
@@ -101,7 +105,20 @@ public final class TypeInferenceOperandChecker
 
     @Override
     public SqlOperandCountRange getOperandCountRange() {
-        return countRange;
+        if (typeInference.getOptionalArguments().isPresent()
+                && typeInference.getOptionalArguments().get().stream()
+                        .anyMatch(Boolean::booleanValue)) {
+            int notOptionalCount =
+                    (int)
+                            typeInference.getOptionalArguments().get().stream()
+                                    .filter(optional -> !optional)
+                                    .count();
+            ArgumentCount argumentCount =
+                    ConstantArgumentCount.between(notOptionalCount, 
countRange.getMax());
+            return new ArgumentCountRange(argumentCount);
+        } else {
+            return countRange;
+        }
     }
 
     @Override
@@ -116,7 +133,22 @@ public final class TypeInferenceOperandChecker
 
     @Override
     public boolean isOptional(int i) {
-        return false;
+        Optional<List<Boolean>> optionalArguments = 
typeInference.getOptionalArguments();
+        if (optionalArguments.isPresent()) {
+            return optionalArguments.get().get(i);
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public boolean isFixedParameters() {
+        // This method returns true only if optional arguments are declared 
and at least one
+        // optional argument is present.
+        // Otherwise, it defaults to false, bypassing the parameter check in 
Calcite.
+        return typeInference.getOptionalArguments().isPresent()
+                && typeInference.getOptionalArguments().get().stream()
+                        .anyMatch(Boolean::booleanValue);
     }
 
     @Override
@@ -173,6 +205,11 @@ public final class TypeInferenceOperandChecker
         final List<SqlNode> operands = callBinding.operands();
         for (int i = 0; i < operands.size(); i++) {
             final LogicalType expectedType = 
expectedDataTypes.get(i).getLogicalType();
+            SqlNode sqlNode = operands.get(i);
+            // skip default node
+            if (sqlNode.getKind() == SqlKind.DEFAULT) {
+                continue;
+            }
             final LogicalType argumentType =
                     toLogicalType(SqlTypeUtil.deriveType(callBinding, 
operands.get(i)));
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index c2b3653aa69..f43d1ff80bf 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -35,6 +35,7 @@ import org.apache.calcite.sql.SqlMatchRecognize;
 import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.SqlPostfixOperator;
 import org.apache.calcite.sql.SqlPrefixOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
 import org.apache.calcite.sql.SqlSyntax;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.type.InferTypes;
@@ -1286,4 +1287,9 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                     .operandTypeChecker(OperandTypes.NILADIC)
                     .notDeterministic()
                     .build();
+
+    // DEFAULT FUNCTION
+    // The default operator is used to fill in missing parameters when using 
named parameter,
+    // which is used during code generation and not exposed to the user by 
default.
+    public static final SqlSpecialOperator DEFAULT = 
SqlStdOperatorTable.DEFAULT;
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java
index cd8ad87097b..8638efae8b0 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/PlannerCallProcedureOperation.java
@@ -131,7 +131,9 @@ public class PlannerCallProcedureOperation implements 
CallProcedureOperation {
         argumentVal[0] = new DefaultProcedureContext(env);
         for (int i = 0; i < internalInputArguments.length; i++) {
             argumentVal[i + 1] =
-                    toExternal(internalInputArguments[i], inputTypes[i], 
userClassLoader);
+                    (internalInputArguments[i] != null)
+                            ? toExternal(internalInputArguments[i], 
inputTypes[i], userClassLoader)
+                            : null;
         }
         return argumentVal;
     }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlProcedureCallConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlProcedureCallConverter.java
index af73ccfc1e1..8205d52ff7e 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlProcedureCallConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlProcedureCallConverter.java
@@ -21,16 +21,17 @@ package 
org.apache.flink.table.planner.operations.converters;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.planner.calcite.FlinkOperatorBinding;
 import org.apache.flink.table.planner.functions.bridging.BridgingSqlProcedure;
 import 
org.apache.flink.table.planner.functions.inference.OperatorBindingCallContext;
 import org.apache.flink.table.planner.operations.PlannerCallProcedureOperation;
 import org.apache.flink.table.planner.plan.utils.RexLiteralUtil;
-import org.apache.flink.table.planner.typeutils.LogicalRelDataTypeConverter;
 import org.apache.flink.table.procedures.ProcedureDefinition;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.inference.TypeInferenceUtil;
 
 import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.ExplicitOperatorBinding;
@@ -47,6 +48,8 @@ import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.table.planner.typeutils.LogicalRelDataTypeConverter.toRelDataType;
+
 /**
  * A converter for call procedure node. The call procedure statement will be 
parsed to a SqlCall
  * wrapping SqlProcedureCallOperator as the operator by calcite. So, this 
converter will try to
@@ -70,12 +73,14 @@ public class SqlProcedureCallConverter implements 
SqlNodeConverter<SqlNode> {
 
         SqlCallBinding sqlCallBinding =
                 new SqlCallBinding(context.getSqlValidator(), null, 
callProcedure);
+
+        List<RexNode> reducedOperands = reduceOperands(sqlCallBinding, 
context);
         SqlOperatorBinding sqlOperatorBinding =
                 new ExplicitOperatorBinding(
                         context.getSqlValidator().getTypeFactory(),
                         sqlProcedure,
-                        sqlCallBinding.operands().stream()
-                                .map(sqlValidator::getValidatedNodeType)
+                        reducedOperands.stream()
+                                .map(RexNode::getType)
                                 .collect(Collectors.toList()));
 
         OperatorBindingCallContext bindingCallContext =
@@ -93,8 +98,6 @@ public class SqlProcedureCallConverter implements 
SqlNodeConverter<SqlNode> {
                                 
context.getCatalogManager().getDataTypeFactory()),
                         bindingCallContext,
                         null);
-
-        List<RexNode> reducedOperands = 
reduceOperands(sqlCallBinding.operands(), context);
         List<DataType> argumentTypes = 
typeInferResult.getExpectedArgumentTypes();
         int argumentCount = argumentTypes.size();
         DataType[] inputTypes = new DataType[argumentCount];
@@ -126,19 +129,30 @@ public class SqlProcedureCallConverter implements 
SqlNodeConverter<SqlNode> {
                 typeInferResult.getOutputDataType());
     }
 
-    private List<RexNode> reduceOperands(List<SqlNode> sqlNodes, 
ConvertContext context) {
+    private List<RexNode> reduceOperands(SqlCallBinding sqlCallBinding, 
ConvertContext context) {
         // we don't really care about the input row type while converting to 
RexNode
         // since call procedure shouldn't refer any inputs.
         // so, construct an empty row for it.
         RelDataType inputRowType =
-                LogicalRelDataTypeConverter.toRelDataType(
+                toRelDataType(
                         DataTypes.ROW().getLogicalType(),
                         context.getSqlValidator().getTypeFactory());
         List<RexNode> rexNodes = new ArrayList<>();
-        for (SqlNode sqlNode : sqlNodes) {
-            RexNode rexNode = context.toRexNode(sqlNode, inputRowType, null);
-            rexNodes.add(rexNode);
+        List<SqlNode> operands = sqlCallBinding.operands();
+        FlinkOperatorBinding flinkOperatorBinding = new 
FlinkOperatorBinding(sqlCallBinding);
+        for (int i = 0; i < operands.size(); i++) {
+            RexNode rexNode = context.toRexNode(operands.get(i), inputRowType, 
null);
+            if (rexNode.getKind() == SqlKind.DEFAULT) {
+                rexNodes.add(
+                        ((RexCall) rexNode)
+                                .clone(
+                                        flinkOperatorBinding.getOperandType(i),
+                                        ((RexCall) rexNode).operands));
+            } else {
+                rexNodes.add(rexNode);
+            }
         }
-        return context.reduceRexNodes(rexNodes);
+        rexNodes = context.reduceRexNodes(rexNodes);
+        return rexNodes;
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
index a589a495dee..9352a8388ef 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
@@ -21,7 +21,7 @@ import org.apache.flink.table.api.DataTypes
 import org.apache.flink.table.data.util.DataFormatConverters
 import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, 
GeneratedExpression}
 import org.apache.flink.table.planner.codegen.CodeGenUtils._
-import 
org.apache.flink.table.planner.codegen.GenerateUtils.{generateCallIfArgsNotNull,
 generateCallIfArgsNullable, generateNonNullField, 
generateStringResultCallIfArgsNotNull}
+import 
org.apache.flink.table.planner.codegen.GenerateUtils.{generateCallIfArgsNotNull,
 generateCallIfArgsNullable, generateNonNullField, generateNullLiteral, 
generateStringResultCallIfArgsNotNull}
 import org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens._
 import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable._
 import org.apache.flink.table.runtime.functions.SqlFunctionUtils
@@ -237,6 +237,9 @@ object StringCallGen {
         val currentDatabase = ctx.addReusableQueryLevelCurrentDatabase()
         generateNonNullField(returnType, currentDatabase)
 
+      case DEFAULT =>
+        generateNullLiteral(returnType)
+
       case _ => null
     }
 
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestProcedureCatalogFactory.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestProcedureCatalogFactory.java
index 71190ffc512..126c9643ea5 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestProcedureCatalogFactory.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestProcedureCatalogFactory.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.planner.factories;
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.annotation.ArgumentHint;
 import org.apache.flink.table.annotation.DataTypeHint;
 import org.apache.flink.table.annotation.ProcedureHint;
 import org.apache.flink.table.catalog.Catalog;
@@ -90,6 +91,9 @@ public class TestProcedureCatalogFactory implements 
CatalogFactory {
             PROCEDURE_MAP.put(
                     ObjectPath.fromString("system.named_args_overload"),
                     new NamedArgumentsProcedureWithOverload());
+            PROCEDURE_MAP.put(
+                    ObjectPath.fromString("system.named_args_optional"),
+                    new NamedArgumentsProcedureWithOptionalArguments());
         }
 
         public CatalogWithBuiltInProcedure(String name) {
@@ -212,6 +216,20 @@ public class TestProcedureCatalogFactory implements 
CatalogFactory {
         }
     }
 
+    /** A procedure with named arguments and optional arguments. */
+    public static class NamedArgumentsProcedureWithOptionalArguments 
implements Procedure {
+
+        @ProcedureHint(
+                output = @DataTypeHint("STRING"),
+                argument = {
+                    @ArgumentHint(type = @DataTypeHint("STRING"), name = "c", 
isOptional = true),
+                    @ArgumentHint(type = @DataTypeHint("INT"), name = "d", 
isOptional = true)
+                })
+        public String[] call(ProcedureContext procedureContext, String arg1, 
Integer arg2) {
+            return new String[] {arg1 + ", " + arg2};
+        }
+    }
+
     /** A simple pojo class for testing purpose. */
     public static class UserPojo {
         private final String name;
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
index 9ace5dff76a..f0623717681 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
@@ -1074,6 +1074,25 @@ public class FunctionITCase extends StreamingTestBase {
         
assertThat(TestCollectionTableFactory.getResult()).containsExactlyInAnyOrder(sinkData);
     }
 
+    @Test
+    void testNamedArgumentsTableFunctionWithOptionalArguments() throws 
Exception {
+        final Row[] sinkData = new Row[] {Row.of("null, str2")};
+
+        TestCollectionTableFactory.reset();
+
+        tEnv().executeSql("CREATE TABLE SinkTable(s STRING) WITH ('connector' 
= 'COLLECTION')");
+
+        tEnv().createFunction(
+                        "NamedArgumentsTableFunctionWithOptionalArguments",
+                        
NamedArgumentsTableFunctionWithOptionalArguments.class);
+        tEnv().executeSql(
+                        "INSERT INTO SinkTable "
+                                + "SELECT T1.s FROM 
TABLE(NamedArgumentsTableFunctionWithOptionalArguments(in2 => 'str2')) AS 
T1(s)")
+                .await();
+
+        
assertThat(TestCollectionTableFactory.getResult()).containsExactlyInAnyOrder(sinkData);
+    }
+
     @Test
     void testNamedArgumentsScalarFunction() throws Exception {
         final List<Row> sourceData =
@@ -1124,6 +1143,29 @@ public class FunctionITCase extends StreamingTestBase {
                         "SQL validation failed. Could not find the argument 
names. Currently named arguments are not supported for varArgs and multi 
different argument names with overload function");
     }
 
+    @Test
+    void testNamedArgumentsScalarFunctionWithOptionalArguments() throws 
Exception {
+        final List<Row> sinkData =
+                Arrays.asList(Row.of("s1: null", "null: s2", "s1: s2", "null: 
null"));
+        TestCollectionTableFactory.reset();
+
+        tEnv().executeSql(
+                        "CREATE TABLE TestTable(s1 STRING, s2 STRING, s3 
STRING, s4 STRING) WITH ('connector' = 'COLLECTION')");
+
+        tEnv().createTemporarySystemFunction(
+                        "NamedArgumentsScalarFunctionWithOptionalArguments",
+                        
NamedArgumentsScalarFunctionWithOptionalArguments.class);
+        tEnv().executeSql(
+                        "INSERT INTO TestTable SELECT"
+                                + " 
NamedArgumentsScalarFunctionWithOptionalArguments(in1 => 's1') as s1,"
+                                + " 
NamedArgumentsScalarFunctionWithOptionalArguments(in2 => 's2') as s2,"
+                                + " 
NamedArgumentsScalarFunctionWithOptionalArguments(in1 => 's1', in2 => 's2') as 
s3,"
+                                + " 
NamedArgumentsScalarFunctionWithOptionalArguments() as s4")
+                .await();
+
+        assertThat(TestCollectionTableFactory.getResult()).isEqualTo(sinkData);
+    }
+
     @Test
     void testNamedArgumentAggregateFunction() throws Exception {
         final List<Row> sourceData =
@@ -1133,7 +1175,8 @@ public class FunctionITCase extends StreamingTestBase {
                         Row.of(LocalDateTime.parse("2007-12-03T10:15:32"), 
"e", "f", 5, 6),
                         Row.of(LocalDateTime.parse("2007-12-03T10:15:32"), 
"gg", "hh", 7, 88));
 
-        final List<Row> sinkData = Arrays.asList(Row.of("a:b", "b:a"), 
Row.of("gg:hh", "hh:gg"));
+        final List<Row> sinkData =
+                Arrays.asList(Row.of("a: b", "b: a"), Row.of("gg: hh", "hh: 
gg"));
 
         TestCollectionTableFactory.reset();
         TestCollectionTableFactory.initData(sourceData);
@@ -1158,6 +1201,42 @@ public class FunctionITCase extends StreamingTestBase {
         assertThat(TestCollectionTableFactory.getResult()).isEqualTo(sinkData);
     }
 
+    @Test
+    void testNamedArgumentAggregateFunctionWithOptionalArguments() throws 
Exception {
+        final List<Row> sourceData =
+                Arrays.asList(
+                        Row.of(LocalDateTime.parse("2007-12-03T10:15:30"), 
"a", "b", 1, 2),
+                        Row.of(LocalDateTime.parse("2007-12-03T10:15:30"), 
"c", "d", 33, 44),
+                        Row.of(LocalDateTime.parse("2007-12-03T10:15:32"), 
"e", "f", 5, 6),
+                        Row.of(LocalDateTime.parse("2007-12-03T10:15:32"), 
"gg", "hh", 7, 88));
+
+        final List<Row> sinkData =
+                Arrays.asList(Row.of("a: null", "null: b"), Row.of("gg: null", 
"null: hh"));
+
+        TestCollectionTableFactory.reset();
+        TestCollectionTableFactory.initData(sourceData);
+
+        tEnv().executeSql(
+                        "CREATE TABLE SourceTable(ts TIMESTAMP(3), s1 STRING, 
s2 STRING, i1 INT, i2 INT, WATERMARK FOR ts AS ts - INTERVAL '1' SECOND) "
+                                + "WITH ('connector' = 'COLLECTION')");
+        tEnv().executeSql(
+                        "CREATE TABLE SinkTable(s1 STRING, s2 STRING) WITH 
('connector' = 'COLLECTION')");
+
+        tEnv().createTemporarySystemFunction(
+                        "NamedArgumentAggregateFunctionWithOptionalArguments",
+                        
NamedArgumentAggregateFunctionWithOptionalArguments.class);
+
+        tEnv().executeSql(
+                        "INSERT INTO SinkTable "
+                                + "SELECT 
NamedArgumentAggregateFunctionWithOptionalArguments(in1 => s1), "
+                                + 
"NamedArgumentAggregateFunctionWithOptionalArguments(in2 => s2) "
+                                + "FROM SourceTable "
+                                + "GROUP BY TUMBLE(ts, INTERVAL '1' SECOND)")
+                .await();
+
+        assertThat(TestCollectionTableFactory.getResult()).isEqualTo(sinkData);
+    }
+
     @Test
     public void testInvalidUseOfScalarFunction() {
         tEnv().executeSql(
@@ -1527,6 +1606,19 @@ public class FunctionITCase extends StreamingTestBase {
                     @ArgumentHint(name = "in1", type = 
@DataTypeHint("string")),
                     @ArgumentHint(name = "in2", type = @DataTypeHint("string"))
                 })
+        public String eval(String arg1, String arg2) {
+            return (arg1 + ":" + arg2);
+        }
+    }
+
+    /** Function with optional arguments. */
+    public static class NamedArgumentsScalarFunctionWithOptionalArguments 
extends ScalarFunction {
+        @FunctionHint(
+                output = @DataTypeHint("STRING"),
+                argument = {
+                    @ArgumentHint(name = "in1", type = 
@DataTypeHint("STRING"), isOptional = true),
+                    @ArgumentHint(name = "in2", type = 
@DataTypeHint("STRING"), isOptional = true)
+                })
         public String eval(String arg1, String arg2) {
             return (arg1 + ": " + arg2);
         }
@@ -1659,6 +1751,20 @@ public class FunctionITCase extends StreamingTestBase {
         }
     }
 
+    /** Function that returns a string or integer. */
+    public static class NamedArgumentsTableFunctionWithOptionalArguments
+            extends TableFunction<Object> {
+        @FunctionHint(
+                argument = {
+                    @ArgumentHint(type = @DataTypeHint("STRING"), name = 
"in1", isOptional = true),
+                    @ArgumentHint(type = @DataTypeHint("STRING"), name = 
"in2", isOptional = true)
+                },
+                output = @DataTypeHint("STRING"))
+        public void eval(String arg1, String arg2) {
+            collect(arg1 + ", " + arg2);
+        }
+    }
+
     /**
      * Function that returns which method has been called.
      *
@@ -1790,7 +1896,37 @@ public class FunctionITCase extends StreamingTestBase {
             if (arg1 == null || arg2 == null) {
                 return;
             }
-            String value = arg1 + ":" + arg2;
+            String value = arg1 + ": " + arg2;
+            final String longestString = (String) acc.getField(0);
+            if (longestString == null || longestString.length() < 
value.length()) {
+                acc.setField(0, value);
+            }
+        }
+
+        @Override
+        public String getValue(Row acc) {
+            return (String) acc.getField(0);
+        }
+    }
+
+    /** Function that aggregates strings and finds the longest string. */
+    public static class NamedArgumentAggregateFunctionWithOptionalArguments
+            extends AggregateFunction<String, Row> {
+
+        @Override
+        public Row createAccumulator() {
+            return Row.of((String) null);
+        }
+
+        @FunctionHint(
+                output = @DataTypeHint("STRING"),
+                argument = {
+                    @ArgumentHint(name = "in1", type = 
@DataTypeHint("STRING"), isOptional = true),
+                    @ArgumentHint(name = "in2", type = 
@DataTypeHint("STRING"), isOptional = true)
+                },
+                accumulator = @DataTypeHint("ROW<longestString STRING>"))
+        public void accumulate(Row acc, String arg1, String arg2) {
+            String value = arg1 + ": " + arg2;
             final String longestString = (String) acc.getField(0);
             if (longestString == null || longestString.length() < 
value.length()) {
                 acc.setField(0, value);
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java
index fb6a0f1dae5..dc5e2319b4d 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/ProcedureITCase.java
@@ -88,7 +88,7 @@ class ProcedureITCase extends StreamingTestBase {
                         tEnv().executeSql("show procedures in 
`system`").collect());
         assertThat(rows.toString())
                 .isEqualTo(
-                        "[+I[generate_n], +I[generate_user], +I[get_year], 
+I[named_args], +I[named_args_overload], +I[sum_n]]");
+                        "[+I[generate_n], +I[generate_user], +I[get_year], 
+I[named_args], +I[named_args_optional], +I[named_args_overload], +I[sum_n]]");
 
         // show procedure with like
         rows =
@@ -115,7 +115,8 @@ class ProcedureITCase extends StreamingTestBase {
                         tEnv().executeSql("show procedures in `system` not 
like 'generate%'")
                                 .collect());
         assertThat(rows.toString())
-                .isEqualTo("[+I[get_year], +I[named_args], 
+I[named_args_overload], +I[sum_n]]");
+                .isEqualTo(
+                        "[+I[get_year], +I[named_args], 
+I[named_args_optional], +I[named_args_overload], +I[sum_n]]");
 
         // show procedure with not ilike
         rows =
@@ -123,7 +124,8 @@ class ProcedureITCase extends StreamingTestBase {
                         tEnv().executeSql("show procedures in `system` not 
ilike 'generaTe%'")
                                 .collect());
         assertThat(rows.toString())
-                .isEqualTo("[+I[get_year], +I[named_args], 
+I[named_args_overload], +I[sum_n]]");
+                .isEqualTo(
+                        "[+I[get_year], +I[named_args], 
+I[named_args_optional], +I[named_args_overload], +I[sum_n]]");
     }
 
     @Test
@@ -200,12 +202,12 @@ class ProcedureITCase extends StreamingTestBase {
     }
 
     @Test
-    void testNamedArgumentsWithDefaultValue() {
-        // default value
-        Assertions.assertThatThrownBy(
-                        () -> tEnv().executeSql("call `system`.named_args(c => 
'yuxia')"))
-                .isInstanceOf(ValidationException.class)
-                .hasMessageContaining("No match found for function signature 
named_args");
+    void testNamedArgumentsWithOptionalArguments() {
+        TableResult tableResult = tEnv().executeSql("call 
`system`.named_args_optional(d => 19)");
+        verifyTableResult(
+                tableResult,
+                Collections.singletonList(Row.of("null, 19")),
+                ResolvedSchema.of(Column.physical("result", 
DataTypes.STRING())));
     }
 
     private void verifyTableResult(


Reply via email to