This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b71dd7695ca [FLINK-36707][table] Add output strategy to 
SystemTypeInference
b71dd7695ca is described below

commit b71dd7695caca4beed63cb7fc9afe36ea8c83710
Author: Timo Walther <twal...@apache.org>
AuthorDate: Tue Jan 14 12:48:45 2025 +0100

    [FLINK-36707][table] Add output strategy to SystemTypeInference
    
    This closes #25972.
---
 .../flink/table/annotation/ArgumentTrait.java      |  33 +++---
 .../table/types/inference/StaticArgumentTrait.java |   3 +-
 .../table/types/inference/SystemTypeInference.java | 114 ++++++++++++++++-----
 .../calcite/sql/validate/ProcedureNamespace.java   |  13 ++-
 .../apache/calcite/sql2rel/SqlToRelConverter.java  |  16 ++-
 .../table/planner/calcite/FlinkSqlCallBinding.java |   7 +-
 .../functions/bridging/BridgingSqlFunction.java    |  10 ++
 .../inference/TypeInferenceReturnInference.java    |  33 +++---
 .../planner/functions/sql/SqlDefaultOperator.java  |   7 +-
 .../plan/stream/sql/ProcessTableFunctionTest.java  |  25 +++++
 10 files changed, 196 insertions(+), 65 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/ArgumentTrait.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/ArgumentTrait.java
index fe161faac7c..8704a3c440b 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/ArgumentTrait.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/ArgumentTrait.java
@@ -22,10 +22,6 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.functions.ProcessTableFunction;
 import org.apache.flink.table.types.inference.StaticArgumentTrait;
 
-import java.util.Arrays;
-import java.util.Set;
-import java.util.stream.Collectors;
-
 /**
  * Declares traits for {@link ArgumentHint}. They enable basic validation by 
the framework.
  *
@@ -83,26 +79,39 @@ public enum ArgumentTrait {
      * Defines that a PARTITION BY clause is optional for {@link 
#TABLE_AS_SET}. By default, it is
      * mandatory for improving the parallel execution by distributing the 
table by key.
      */
-    OPTIONAL_PARTITION_BY(false, StaticArgumentTrait.OPTIONAL_PARTITION_BY, 
TABLE_AS_SET);
+    OPTIONAL_PARTITION_BY(false, StaticArgumentTrait.OPTIONAL_PARTITION_BY),
+
+    /**
+     * Defines that all columns of a table argument (i.e. {@link 
#TABLE_AS_ROW} or {@link
+     * #TABLE_AS_SET}) are included in the output of the PTF. By default, only 
columns of the
+     * PARTITION BY clause are passed through.
+     *
+     * <p>Given a table t (containing columns k and v), and a PTF f() 
(producing columns c1 and c2),
+     * the output of a {@code SELECT * FROM f(tableArg => TABLE t PARTITION BY 
k)} uses the
+     * following order:
+     *
+     * <pre>
+     *     Default: | k | c1 | c2 |
+     *     With pass-through columns: | k | v | c1 | c2 |
+     * </pre>
+     *
+     * <p>In case of multiple table arguments, pass-through columns are added 
according to the
+     * declaration order in the PTF signature.
+     */
+    PASS_COLUMNS_THROUGH(false, StaticArgumentTrait.PASS_COLUMNS_THROUGH);
 
     private final boolean isRoot;
     private final StaticArgumentTrait staticTrait;
-    private final Set<ArgumentTrait> requirements;
 
-    ArgumentTrait(boolean isRoot, StaticArgumentTrait staticTrait, 
ArgumentTrait... requirements) {
+    ArgumentTrait(boolean isRoot, StaticArgumentTrait staticTrait) {
         this.isRoot = isRoot;
         this.staticTrait = staticTrait;
-        this.requirements = 
Arrays.stream(requirements).collect(Collectors.toSet());
     }
 
     public boolean isRoot() {
         return isRoot;
     }
 
-    public Set<ArgumentTrait> getRequirements() {
-        return requirements;
-    }
-
     public StaticArgumentTrait toStaticTrait() {
         return staticTrait;
     }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgumentTrait.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgumentTrait.java
index 0590d21a340..b9f4d4c71fb 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgumentTrait.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgumentTrait.java
@@ -37,7 +37,8 @@ public enum StaticArgumentTrait {
     MODEL(),
     TABLE_AS_ROW(TABLE),
     TABLE_AS_SET(TABLE),
-    OPTIONAL_PARTITION_BY(TABLE_AS_SET);
+    OPTIONAL_PARTITION_BY(TABLE_AS_SET),
+    PASS_COLUMNS_THROUGH(TABLE);
 
     private final Set<StaticArgumentTrait> requirements;
 
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java
index ba89b5e7966..6c64b32ec25 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.DataTypes.Field;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.functions.FunctionKind;
 import org.apache.flink.table.functions.ProcessTableFunction;
@@ -31,7 +32,9 @@ import org.apache.flink.table.types.DataType;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
@@ -39,6 +42,7 @@ import java.util.function.Predicate;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import java.util.stream.Stream;
 
 /**
  * Extends the {@link TypeInference} function-aware by additional system 
columns and validation.
@@ -72,7 +76,8 @@ public class SystemTypeInference {
                 deriveSystemInputStrategy(functionKind, systemArgs, 
origin.getInputTypeStrategy()));
         builder.stateTypeStrategies(origin.getStateTypeStrategies());
         builder.outputTypeStrategy(
-                deriveSystemOutputStrategy(functionKind, 
origin.getOutputTypeStrategy()));
+                deriveSystemOutputStrategy(
+                        functionKind, systemArgs, 
origin.getOutputTypeStrategy()));
         return builder.build();
     }
 
@@ -137,18 +142,22 @@ public class SystemTypeInference {
     }
 
     private static TypeStrategy deriveSystemOutputStrategy(
-            FunctionKind functionKind, TypeStrategy outputStrategy) {
+            FunctionKind functionKind,
+            @Nullable List<StaticArgument> staticArgs,
+            TypeStrategy outputStrategy) {
         if (functionKind != FunctionKind.TABLE && functionKind != 
FunctionKind.PROCESS_TABLE) {
             return outputStrategy;
         }
-        return new SystemOutputStrategy(outputStrategy);
+        return new SystemOutputStrategy(staticArgs, outputStrategy);
     }
 
     private static class SystemOutputStrategy implements TypeStrategy {
 
+        private final List<StaticArgument> staticArgs;
         private final TypeStrategy origin;
 
-        private SystemOutputStrategy(TypeStrategy origin) {
+        private SystemOutputStrategy(List<StaticArgument> staticArgs, 
TypeStrategy origin) {
+            this.staticArgs = staticArgs;
             this.origin = origin;
         }
 
@@ -156,30 +165,87 @@ public class SystemTypeInference {
         public Optional<DataType> inferType(CallContext callContext) {
             return origin.inferType(callContext)
                     .map(
-                            dataType -> {
-                                final List<DataType> fieldTypes =
-                                        DataType.getFieldDataTypes(dataType);
-                                final List<String> fieldNames = 
DataType.getFieldNames(dataType);
+                            functionDataType -> {
                                 final List<Field> fields = new ArrayList<>();
-                                if (fieldTypes.isEmpty()) {
-                                    // Before the system type inference was 
introduced, SQL and
-                                    // Table API chose a different default 
field name.
-                                    // EXPR$0 is chosen for best-effort 
backwards compatibility for
-                                    // SQL users.
-                                    fields.add(DataTypes.FIELD("EXPR$0", 
dataType));
-                                } else {
-                                    IntStream.range(0, fieldTypes.size())
-                                            .mapToObj(
-                                                    pos ->
-                                                            DataTypes.FIELD(
-                                                                    
fieldNames.get(pos),
-                                                                    
fieldTypes.get(pos)))
-                                            .forEach(fields::add);
-                                }
 
-                                return DataTypes.ROW(fields).notNull();
+                                // According to the SQL standard, pass-through 
columns should
+                                // actually be added at the end of the output 
row type. However,
+                                // looking at the overall landscape we deviate 
from the standard in
+                                // this regard:
+                                // - Calcite built-in window functions add 
them at the beginning
+                                // - MATCH_RECOGNIZE adds PARTITION BY columns 
at the beginning
+                                // - Flink SESSION windows add pass-through 
columns at the beginning
+                                // - Oracle adds pass-through columns for all 
ROW semantics args, so
+                                // this whole topic is kind of vendor specific 
already
+                                
fields.addAll(derivePassThroughFields(callContext));
+                                
fields.addAll(deriveFunctionOutputFields(functionDataType));
+
+                                final List<Field> uniqueFields = 
makeFieldNamesUnique(fields);
+
+                                return DataTypes.ROW(uniqueFields).notNull();
                             });
         }
+
+        private List<Field> makeFieldNamesUnique(List<Field> fields) {
+            final Map<String, Integer> fieldCount = new HashMap<>();
+            return fields.stream()
+                    .map(
+                            item -> {
+                                final int nextCount =
+                                        fieldCount.compute(
+                                                item.getName(),
+                                                (fieldName, count) ->
+                                                        count == null ? -1 : 
count + 1);
+                                final String newFieldName =
+                                        nextCount < 0 ? item.getName() : 
item.getName() + nextCount;
+                                return DataTypes.FIELD(newFieldName, 
item.getDataType());
+                            })
+                    .collect(Collectors.toList());
+        }
+
+        private List<Field> derivePassThroughFields(CallContext callContext) {
+            if (staticArgs == null) {
+                return List.of();
+            }
+            final List<DataType> argDataTypes = 
callContext.getArgumentDataTypes();
+            return IntStream.range(0, staticArgs.size())
+                    .mapToObj(
+                            pos -> {
+                                final StaticArgument arg = staticArgs.get(pos);
+                                if 
(arg.is(StaticArgumentTrait.PASS_COLUMNS_THROUGH)) {
+                                    return 
DataType.getFields(argDataTypes.get(pos)).stream();
+                                }
+                                if (!arg.is(StaticArgumentTrait.TABLE_AS_SET)) 
{
+                                    return Stream.<Field>empty();
+                                }
+                                final TableSemantics semantics =
+                                        callContext
+                                                .getTableSemantics(pos)
+                                                
.orElseThrow(IllegalStateException::new);
+                                final DataType projectedRow =
+                                        
Projection.of(semantics.partitionByColumns())
+                                                
.project(argDataTypes.get(pos));
+                                return 
DataType.getFields(projectedRow).stream();
+                            })
+                    .flatMap(s -> s)
+                    .collect(Collectors.toList());
+        }
+
+        private List<Field> deriveFunctionOutputFields(DataType 
functionDataType) {
+            final List<DataType> fieldTypes = 
DataType.getFieldDataTypes(functionDataType);
+            final List<String> fieldNames = 
DataType.getFieldNames(functionDataType);
+
+            if (fieldTypes.isEmpty()) {
+                // Before the system type inference was introduced, SQL and
+                // Table API chose a different default field name.
+                // EXPR$0 is chosen for best-effort backwards compatibility for
+                // SQL users.
+                return List.of(DataTypes.FIELD("EXPR$0", functionDataType));
+            }
+            return IntStream.range(0, fieldTypes.size())
+                    .mapToObj(pos -> DataTypes.FIELD(fieldNames.get(pos), 
fieldTypes.get(pos)))
+                    .collect(Collectors.toList());
+        }
     }
 
     private static class SystemInputStrategy implements InputTypeStrategy {
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/ProcedureNamespace.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/ProcedureNamespace.java
index ce11073193c..fabcac75522 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/ProcedureNamespace.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/ProcedureNamespace.java
@@ -57,17 +57,22 @@ public final class ProcedureNamespace extends 
AbstractNamespace {
 
     public RelDataType validateImpl(RelDataType targetRowType) {
         validator.inferUnknownTypes(validator.unknownType, scope, call);
+
         final SqlOperator operator = call.getOperator();
         final SqlCallBinding callBinding = new FlinkSqlCallBinding(validator, 
scope, call);
+        final SqlCall permutedCall = callBinding.permutedCall();
+        if (operator instanceof SqlWindowTableFunction) {
+            permutedCall.validate(validator, scope);
+        }
+
         // The result is ignored but the type is derived to trigger the 
function resolution
-        validator.deriveTypeImpl(scope, callBinding.permutedCall());
+        validator.deriveTypeImpl(scope, permutedCall);
+
         if (!(operator instanceof SqlTableFunction)) {
             throw new IllegalArgumentException(
                     "Argument must be a table function: " + 
operator.getNameAsId());
         }
-        if (operator instanceof SqlWindowTableFunction) {
-            callBinding.permutedCall().validate(validator, scope);
-        }
+
         final SqlTableFunction tableFunction = (SqlTableFunction) operator;
         final SqlReturnTypeInference rowTypeInference = 
tableFunction.getRowTypeInference();
         return requireNonNull(
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
index 3d356d66681..a4d4a08772d 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
@@ -249,8 +249,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  *   <li>Added in FLINK-32474: Lines 2499 ~ 2501
  *   <li>Added in FLINK-32474: Lines 2906 ~ 2918
  *   <li>Added in FLINK-32474: Lines 3019 ~ 3053
- *   <li>Added in FLINK-34312: Lines 5693 ~ 5696
- *   <li>Added in FLINK-34057, FLINK-34058, FLINK-34312: Lines 6144 ~ 6162
+ *   <li>Added in FLINK-34312: Lines 5804 ~ 5813
+ *   <li>Added in FLINK-34057, FLINK-34058, FLINK-34312: Lines 6263 ~ 6279
  * </ol>
  */
 @SuppressWarnings("UnstableApiUsage")
@@ -5801,8 +5801,16 @@ public class SqlToRelConverter {
                 }
             }
             // ----- FLINK MODIFICATION BEGIN -----
-            return exprConverter.convertCall(
-                    this, new FlinkSqlCallBinding(validator(), scope, 
call).permutedCall());
+            final SqlCall permutedCall =
+                    new FlinkSqlCallBinding(validator(), scope, 
call).permutedCall();
+            final RelDataType typeIfKnown = 
validator().getValidatedNodeTypeIfKnown(call);
+            if (typeIfKnown != null) {
+                // Argument permutation should not affect the output type,
+                // reset it if it was known. Otherwise, the type inference 
would be called twice
+                // when converting to RexNode.
+                validator().setValidatedNodeType(permutedCall, typeIfKnown);
+            }
+            return exprConverter.convertCall(this, permutedCall);
             // ----- FLINK MODIFICATION END -----
         }
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkSqlCallBinding.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkSqlCallBinding.java
index ce901c7c00a..4f5ffe9681f 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkSqlCallBinding.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkSqlCallBinding.java
@@ -105,9 +105,10 @@ public class FlinkSqlCallBinding extends SqlCallBinding {
         for (SqlNode operand : super.operands()) {
             if (operand instanceof SqlCall
                     && ((SqlCall) operand).getOperator() == 
SqlStdOperatorTable.DEFAULT) {
-                rewrittenOperands.add(
-                        new 
SqlDefaultOperator(fixedArgumentTypes.get(rewrittenOperands.size()))
-                                .createCall(SqlParserPos.ZERO));
+                final RelDataType argumentType = 
fixedArgumentTypes.get(rewrittenOperands.size());
+                final SqlCall defaultArg =
+                        new 
SqlDefaultOperator(argumentType).createCall(SqlParserPos.ZERO);
+                rewrittenOperands.add(defaultArg);
             } else {
                 rewrittenOperands.add(operand);
             }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java
index d620741b330..fb4833f55b3 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java
@@ -279,5 +279,15 @@ public class BridgingSqlFunction extends SqlFunction {
             }
             return TableCharacteristic.builder(semantics).build();
         }
+
+        @Override
+        public boolean argumentMustBeScalar(int ordinal) {
+            final List<StaticArgument> args = 
typeInference.getStaticArguments().orElse(null);
+            if (args == null || ordinal >= args.size()) {
+                return true;
+            }
+            final StaticArgument arg = args.get(ordinal);
+            return !arg.is(StaticArgumentTrait.TABLE);
+        }
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceReturnInference.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceReturnInference.java
index 8bb177b6535..3b66f91a34b 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceReturnInference.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceReturnInference.java
@@ -34,8 +34,6 @@ import org.apache.calcite.sql.SqlCallBinding;
 import org.apache.calcite.sql.SqlOperatorBinding;
 import org.apache.calcite.sql.type.SqlReturnTypeInference;
 
-import javax.annotation.Nullable;
-
 import static 
org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTypeFactory;
 import static 
org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidCallException;
 import static 
org.apache.flink.table.types.inference.TypeInferenceUtil.createUnexpectedException;
@@ -65,15 +63,10 @@ public final class TypeInferenceReturnInference implements 
SqlReturnTypeInferenc
     }
 
     @Override
-    public RelDataType inferReturnType(SqlOperatorBinding opBinding) {
-        final CallContext callContext =
-                new OperatorBindingCallContext(
-                        dataTypeFactory,
-                        definition,
-                        opBinding,
-                        extractExpectedOutputType(opBinding));
+    public RelDataType inferReturnType(SqlOperatorBinding binding) {
+        final CallContext callContext = createCallContext(binding);
         try {
-            return inferReturnTypeOrError(unwrapTypeFactory(opBinding), 
callContext);
+            return inferReturnTypeOrError(unwrapTypeFactory(binding), 
callContext);
         } catch (ValidationException e) {
             throw createInvalidCallException(callContext, e);
         } catch (Throwable t) {
@@ -83,14 +76,22 @@ public final class TypeInferenceReturnInference implements 
SqlReturnTypeInferenc
 
     // 
--------------------------------------------------------------------------------------------
 
-    private @Nullable RelDataType extractExpectedOutputType(SqlOperatorBinding 
opBinding) {
-        if (opBinding instanceof SqlCallBinding) {
-            final SqlCallBinding binding = (SqlCallBinding) opBinding;
+    private CallContext createCallContext(SqlOperatorBinding binding) {
+        if (binding instanceof SqlCallBinding) {
+            final SqlCallBinding callBinding = (SqlCallBinding) binding;
             final FlinkCalciteSqlValidator validator =
-                    (FlinkCalciteSqlValidator) binding.getValidator();
-            return 
validator.getExpectedOutputType(binding.getCall()).orElse(null);
+                    (FlinkCalciteSqlValidator) callBinding.getValidator();
+            final RelDataType expectedOutputType =
+                    
validator.getExpectedOutputType(callBinding.getCall()).orElse(null);
+            return new CallBindingCallContext(
+                    dataTypeFactory,
+                    definition,
+                    callBinding,
+                    expectedOutputType,
+                    typeInference.getStaticArguments().orElse(null));
+        } else {
+            return new OperatorBindingCallContext(dataTypeFactory, definition, 
binding, null);
         }
-        return null;
     }
 
     private RelDataType inferReturnTypeOrError(
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlDefaultOperator.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlDefaultOperator.java
index 05cf3c948ed..2898565535d 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlDefaultOperator.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlDefaultOperator.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.planner.functions.sql;
 
+import org.apache.flink.table.planner.calcite.FlinkSqlCallBinding;
+
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.sql.SqlCall;
 import org.apache.calcite.sql.SqlKind;
@@ -29,7 +31,10 @@ import org.apache.calcite.sql.type.ReturnTypes;
 import org.apache.calcite.sql.validate.SqlValidator;
 import org.apache.calcite.sql.validate.SqlValidatorScope;
 
-/** Default operator has specified type. */
+/**
+ * Marker for optional arguments inserted by {@link FlinkSqlCallBinding}. 
Compared to Calcite, this
+ * operator stores its type.
+ */
 public class SqlDefaultOperator extends SqlSpecialOperator {
 
     private final RelDataType returnType;
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java
index c5039da5730..8aeee6ca5c0 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java
@@ -45,6 +45,7 @@ import java.util.stream.Stream;
 
 import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
 import static 
org.apache.flink.table.annotation.ArgumentTrait.OPTIONAL_PARTITION_BY;
+import static 
org.apache.flink.table.annotation.ArgumentTrait.PASS_COLUMNS_THROUGH;
 import static org.apache.flink.table.annotation.ArgumentTrait.TABLE_AS_ROW;
 import static org.apache.flink.table.annotation.ArgumentTrait.TABLE_AS_SET;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -134,6 +135,18 @@ public class ProcessTableFunctionTest extends 
TableTestBase {
                 "SELECT * FROM f(input => TABLE t1, scalar => 
pojoCreator('Bob', 12), uid => 'my-ptf')");
     }
 
+    @Test
+    void testTableAsSetPassThroughColumns() {
+        util.addTemporarySystemFunction("f", 
TableAsSetPassThroughFunction.class);
+        assertReachesOptimizer("SELECT * FROM f(r => TABLE t1 PARTITION BY 
name, i => 1)");
+    }
+
+    @Test
+    void testTableAsRowPassThroughColumns() {
+        util.addTemporarySystemFunction("f", 
TableAsRowPassThroughFunction.class);
+        assertReachesOptimizer("SELECT * FROM f(r => TABLE t1, i => 1)");
+    }
+
     @ParameterizedTest
     @MethodSource("errorSpecs")
     void testErrorBehavior(ErrorSpec spec) {
@@ -259,6 +272,18 @@ public class ProcessTableFunctionTest extends 
TableTestBase {
         public void eval(@ArgumentHint({TABLE_AS_SET, OPTIONAL_PARTITION_BY}) 
Row r, Integer i) {}
     }
 
+    /** Testing function. */
+    public static class TableAsRowPassThroughFunction extends 
ProcessTableFunction<String> {
+        @SuppressWarnings("unused")
+        public void eval(@ArgumentHint({TABLE_AS_ROW, PASS_COLUMNS_THROUGH}) 
Row r, Integer i) {}
+    }
+
+    /** Testing function. */
+    public static class TableAsSetPassThroughFunction extends 
ProcessTableFunction<String> {
+        @SuppressWarnings("unused")
+        public void eval(@ArgumentHint({TABLE_AS_SET, PASS_COLUMNS_THROUGH}) 
Row r, Integer i) {}
+    }
+
     /** Testing function. */
     public static class NoProcessTableFunction extends TableFunction<String> {
 

Reply via email to