This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new b71dd7695ca [FLINK-36707][table] Add output strategy to SystemTypeInference b71dd7695ca is described below commit b71dd7695caca4beed63cb7fc9afe36ea8c83710 Author: Timo Walther <twal...@apache.org> AuthorDate: Tue Jan 14 12:48:45 2025 +0100 [FLINK-36707][table] Add output strategy to SystemTypeInference This closes #25972. --- .../flink/table/annotation/ArgumentTrait.java | 33 +++--- .../table/types/inference/StaticArgumentTrait.java | 3 +- .../table/types/inference/SystemTypeInference.java | 114 ++++++++++++++++----- .../calcite/sql/validate/ProcedureNamespace.java | 13 ++- .../apache/calcite/sql2rel/SqlToRelConverter.java | 16 ++- .../table/planner/calcite/FlinkSqlCallBinding.java | 7 +- .../functions/bridging/BridgingSqlFunction.java | 10 ++ .../inference/TypeInferenceReturnInference.java | 33 +++--- .../planner/functions/sql/SqlDefaultOperator.java | 7 +- .../plan/stream/sql/ProcessTableFunctionTest.java | 25 +++++ 10 files changed, 196 insertions(+), 65 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/ArgumentTrait.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/ArgumentTrait.java index fe161faac7c..8704a3c440b 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/ArgumentTrait.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/ArgumentTrait.java @@ -22,10 +22,6 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.table.functions.ProcessTableFunction; import org.apache.flink.table.types.inference.StaticArgumentTrait; -import java.util.Arrays; -import java.util.Set; -import java.util.stream.Collectors; - /** * Declares traits for {@link ArgumentHint}. They enable basic validation by the framework. * @@ -83,26 +79,39 @@ public enum ArgumentTrait { * Defines that a PARTITION BY clause is optional for {@link #TABLE_AS_SET}. By default, it is * mandatory for improving the parallel execution by distributing the table by key. */ - OPTIONAL_PARTITION_BY(false, StaticArgumentTrait.OPTIONAL_PARTITION_BY, TABLE_AS_SET); + OPTIONAL_PARTITION_BY(false, StaticArgumentTrait.OPTIONAL_PARTITION_BY), + + /** + * Defines that all columns of a table argument (i.e. {@link #TABLE_AS_ROW} or {@link + * #TABLE_AS_SET}) are included in the output of the PTF. By default, only columns of the + * PARTITION BY clause are passed through. + * + * <p>Given a table t (containing columns k and v), and a PTF f() (producing columns c1 and c2), + * the output of a {@code SELECT * FROM f(tableArg => TABLE t PARTITION BY k)} uses the + * following order: + * + * <pre> + * Default: | k | c1 | c2 | + * With pass-through columns: | k | v | c1 | c2 | + * </pre> + * + * <p>In case of multiple table arguments, pass-through columns are added according to the + * declaration order in the PTF signature. + */ + PASS_COLUMNS_THROUGH(false, StaticArgumentTrait.PASS_COLUMNS_THROUGH); private final boolean isRoot; private final StaticArgumentTrait staticTrait; - private final Set<ArgumentTrait> requirements; - ArgumentTrait(boolean isRoot, StaticArgumentTrait staticTrait, ArgumentTrait... requirements) { + ArgumentTrait(boolean isRoot, StaticArgumentTrait staticTrait) { this.isRoot = isRoot; this.staticTrait = staticTrait; - this.requirements = Arrays.stream(requirements).collect(Collectors.toSet()); } public boolean isRoot() { return isRoot; } - public Set<ArgumentTrait> getRequirements() { - return requirements; - } - public StaticArgumentTrait toStaticTrait() { return staticTrait; } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgumentTrait.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgumentTrait.java index 0590d21a340..b9f4d4c71fb 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgumentTrait.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgumentTrait.java @@ -37,7 +37,8 @@ public enum StaticArgumentTrait { MODEL(), TABLE_AS_ROW(TABLE), TABLE_AS_SET(TABLE), - OPTIONAL_PARTITION_BY(TABLE_AS_SET); + OPTIONAL_PARTITION_BY(TABLE_AS_SET), + PASS_COLUMNS_THROUGH(TABLE); private final Set<StaticArgumentTrait> requirements; diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java index ba89b5e7966..6c64b32ec25 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.DataTypes.Field; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.connector.Projection; import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.table.functions.FunctionKind; import org.apache.flink.table.functions.ProcessTableFunction; @@ -31,7 +32,9 @@ import org.apache.flink.table.types.DataType; import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; @@ -39,6 +42,7 @@ import java.util.function.Predicate; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; /** * Extends the {@link TypeInference} function-aware by additional system columns and validation. @@ -72,7 +76,8 @@ public class SystemTypeInference { deriveSystemInputStrategy(functionKind, systemArgs, origin.getInputTypeStrategy())); builder.stateTypeStrategies(origin.getStateTypeStrategies()); builder.outputTypeStrategy( - deriveSystemOutputStrategy(functionKind, origin.getOutputTypeStrategy())); + deriveSystemOutputStrategy( + functionKind, systemArgs, origin.getOutputTypeStrategy())); return builder.build(); } @@ -137,18 +142,22 @@ public class SystemTypeInference { } private static TypeStrategy deriveSystemOutputStrategy( - FunctionKind functionKind, TypeStrategy outputStrategy) { + FunctionKind functionKind, + @Nullable List<StaticArgument> staticArgs, + TypeStrategy outputStrategy) { if (functionKind != FunctionKind.TABLE && functionKind != FunctionKind.PROCESS_TABLE) { return outputStrategy; } - return new SystemOutputStrategy(outputStrategy); + return new SystemOutputStrategy(staticArgs, outputStrategy); } private static class SystemOutputStrategy implements TypeStrategy { + private final List<StaticArgument> staticArgs; private final TypeStrategy origin; - private SystemOutputStrategy(TypeStrategy origin) { + private SystemOutputStrategy(List<StaticArgument> staticArgs, TypeStrategy origin) { + this.staticArgs = staticArgs; this.origin = origin; } @@ -156,30 +165,87 @@ public class SystemTypeInference { public Optional<DataType> inferType(CallContext callContext) { return origin.inferType(callContext) .map( - dataType -> { - final List<DataType> fieldTypes = - DataType.getFieldDataTypes(dataType); - final List<String> fieldNames = DataType.getFieldNames(dataType); + functionDataType -> { final List<Field> fields = new ArrayList<>(); - if (fieldTypes.isEmpty()) { - // Before the system type inference was introduced, SQL and - // Table API chose a different default field name. - // EXPR$0 is chosen for best-effort backwards compatibility for - // SQL users. - fields.add(DataTypes.FIELD("EXPR$0", dataType)); - } else { - IntStream.range(0, fieldTypes.size()) - .mapToObj( - pos -> - DataTypes.FIELD( - fieldNames.get(pos), - fieldTypes.get(pos))) - .forEach(fields::add); - } - return DataTypes.ROW(fields).notNull(); + // According to the SQL standard, pass-through columns should + // actually be added at the end of the output row type. However, + // looking at the overall landscape we deviate from the standard in + // this regard: + // - Calcite built-in window functions add them at the beginning + // - MATCH_RECOGNIZE adds PARTITION BY columns at the beginning + // - Flink SESSION windows add pass-through columns at the beginning + // - Oracle adds pass-through columns for all ROW semantics args, so + // this whole topic is kind of vendor specific already + fields.addAll(derivePassThroughFields(callContext)); + fields.addAll(deriveFunctionOutputFields(functionDataType)); + + final List<Field> uniqueFields = makeFieldNamesUnique(fields); + + return DataTypes.ROW(uniqueFields).notNull(); }); } + + private List<Field> makeFieldNamesUnique(List<Field> fields) { + final Map<String, Integer> fieldCount = new HashMap<>(); + return fields.stream() + .map( + item -> { + final int nextCount = + fieldCount.compute( + item.getName(), + (fieldName, count) -> + count == null ? -1 : count + 1); + final String newFieldName = + nextCount < 0 ? item.getName() : item.getName() + nextCount; + return DataTypes.FIELD(newFieldName, item.getDataType()); + }) + .collect(Collectors.toList()); + } + + private List<Field> derivePassThroughFields(CallContext callContext) { + if (staticArgs == null) { + return List.of(); + } + final List<DataType> argDataTypes = callContext.getArgumentDataTypes(); + return IntStream.range(0, staticArgs.size()) + .mapToObj( + pos -> { + final StaticArgument arg = staticArgs.get(pos); + if (arg.is(StaticArgumentTrait.PASS_COLUMNS_THROUGH)) { + return DataType.getFields(argDataTypes.get(pos)).stream(); + } + if (!arg.is(StaticArgumentTrait.TABLE_AS_SET)) { + return Stream.<Field>empty(); + } + final TableSemantics semantics = + callContext + .getTableSemantics(pos) + .orElseThrow(IllegalStateException::new); + final DataType projectedRow = + Projection.of(semantics.partitionByColumns()) + .project(argDataTypes.get(pos)); + return DataType.getFields(projectedRow).stream(); + }) + .flatMap(s -> s) + .collect(Collectors.toList()); + } + + private List<Field> deriveFunctionOutputFields(DataType functionDataType) { + final List<DataType> fieldTypes = DataType.getFieldDataTypes(functionDataType); + final List<String> fieldNames = DataType.getFieldNames(functionDataType); + + if (fieldTypes.isEmpty()) { + // Before the system type inference was introduced, SQL and + // Table API chose a different default field name. + // EXPR$0 is chosen for best-effort backwards compatibility for + // SQL users. + return List.of(DataTypes.FIELD("EXPR$0", functionDataType)); + } + return IntStream.range(0, fieldTypes.size()) + .mapToObj(pos -> DataTypes.FIELD(fieldNames.get(pos), fieldTypes.get(pos))) + .collect(Collectors.toList()); + } } private static class SystemInputStrategy implements InputTypeStrategy { diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/ProcedureNamespace.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/ProcedureNamespace.java index ce11073193c..fabcac75522 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/ProcedureNamespace.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/ProcedureNamespace.java @@ -57,17 +57,22 @@ public final class ProcedureNamespace extends AbstractNamespace { public RelDataType validateImpl(RelDataType targetRowType) { validator.inferUnknownTypes(validator.unknownType, scope, call); + final SqlOperator operator = call.getOperator(); final SqlCallBinding callBinding = new FlinkSqlCallBinding(validator, scope, call); + final SqlCall permutedCall = callBinding.permutedCall(); + if (operator instanceof SqlWindowTableFunction) { + permutedCall.validate(validator, scope); + } + // The result is ignored but the type is derived to trigger the function resolution - validator.deriveTypeImpl(scope, callBinding.permutedCall()); + validator.deriveTypeImpl(scope, permutedCall); + if (!(operator instanceof SqlTableFunction)) { throw new IllegalArgumentException( "Argument must be a table function: " + operator.getNameAsId()); } - if (operator instanceof SqlWindowTableFunction) { - callBinding.permutedCall().validate(validator, scope); - } + final SqlTableFunction tableFunction = (SqlTableFunction) operator; final SqlReturnTypeInference rowTypeInference = tableFunction.getRowTypeInference(); return requireNonNull( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java index 3d356d66681..a4d4a08772d 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java @@ -249,8 +249,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * <li>Added in FLINK-32474: Lines 2499 ~ 2501 * <li>Added in FLINK-32474: Lines 2906 ~ 2918 * <li>Added in FLINK-32474: Lines 3019 ~ 3053 - * <li>Added in FLINK-34312: Lines 5693 ~ 5696 - * <li>Added in FLINK-34057, FLINK-34058, FLINK-34312: Lines 6144 ~ 6162 + * <li>Added in FLINK-34312: Lines 5804 ~ 5813 + * <li>Added in FLINK-34057, FLINK-34058, FLINK-34312: Lines 6263 ~ 6279 * </ol> */ @SuppressWarnings("UnstableApiUsage") @@ -5801,8 +5801,16 @@ public class SqlToRelConverter { } } // ----- FLINK MODIFICATION BEGIN ----- - return exprConverter.convertCall( - this, new FlinkSqlCallBinding(validator(), scope, call).permutedCall()); + final SqlCall permutedCall = + new FlinkSqlCallBinding(validator(), scope, call).permutedCall(); + final RelDataType typeIfKnown = validator().getValidatedNodeTypeIfKnown(call); + if (typeIfKnown != null) { + // Argument permutation should not affect the output type, + // reset it if it was known. Otherwise, the type inference would be called twice + // when converting to RexNode. + validator().setValidatedNodeType(permutedCall, typeIfKnown); + } + return exprConverter.convertCall(this, permutedCall); // ----- FLINK MODIFICATION END ----- } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkSqlCallBinding.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkSqlCallBinding.java index ce901c7c00a..4f5ffe9681f 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkSqlCallBinding.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkSqlCallBinding.java @@ -105,9 +105,10 @@ public class FlinkSqlCallBinding extends SqlCallBinding { for (SqlNode operand : super.operands()) { if (operand instanceof SqlCall && ((SqlCall) operand).getOperator() == SqlStdOperatorTable.DEFAULT) { - rewrittenOperands.add( - new SqlDefaultOperator(fixedArgumentTypes.get(rewrittenOperands.size())) - .createCall(SqlParserPos.ZERO)); + final RelDataType argumentType = fixedArgumentTypes.get(rewrittenOperands.size()); + final SqlCall defaultArg = + new SqlDefaultOperator(argumentType).createCall(SqlParserPos.ZERO); + rewrittenOperands.add(defaultArg); } else { rewrittenOperands.add(operand); } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java index d620741b330..fb4833f55b3 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java @@ -279,5 +279,15 @@ public class BridgingSqlFunction extends SqlFunction { } return TableCharacteristic.builder(semantics).build(); } + + @Override + public boolean argumentMustBeScalar(int ordinal) { + final List<StaticArgument> args = typeInference.getStaticArguments().orElse(null); + if (args == null || ordinal >= args.size()) { + return true; + } + final StaticArgument arg = args.get(ordinal); + return !arg.is(StaticArgumentTrait.TABLE); + } } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceReturnInference.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceReturnInference.java index 8bb177b6535..3b66f91a34b 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceReturnInference.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceReturnInference.java @@ -34,8 +34,6 @@ import org.apache.calcite.sql.SqlCallBinding; import org.apache.calcite.sql.SqlOperatorBinding; import org.apache.calcite.sql.type.SqlReturnTypeInference; -import javax.annotation.Nullable; - import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTypeFactory; import static org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidCallException; import static org.apache.flink.table.types.inference.TypeInferenceUtil.createUnexpectedException; @@ -65,15 +63,10 @@ public final class TypeInferenceReturnInference implements SqlReturnTypeInferenc } @Override - public RelDataType inferReturnType(SqlOperatorBinding opBinding) { - final CallContext callContext = - new OperatorBindingCallContext( - dataTypeFactory, - definition, - opBinding, - extractExpectedOutputType(opBinding)); + public RelDataType inferReturnType(SqlOperatorBinding binding) { + final CallContext callContext = createCallContext(binding); try { - return inferReturnTypeOrError(unwrapTypeFactory(opBinding), callContext); + return inferReturnTypeOrError(unwrapTypeFactory(binding), callContext); } catch (ValidationException e) { throw createInvalidCallException(callContext, e); } catch (Throwable t) { @@ -83,14 +76,22 @@ public final class TypeInferenceReturnInference implements SqlReturnTypeInferenc // -------------------------------------------------------------------------------------------- - private @Nullable RelDataType extractExpectedOutputType(SqlOperatorBinding opBinding) { - if (opBinding instanceof SqlCallBinding) { - final SqlCallBinding binding = (SqlCallBinding) opBinding; + private CallContext createCallContext(SqlOperatorBinding binding) { + if (binding instanceof SqlCallBinding) { + final SqlCallBinding callBinding = (SqlCallBinding) binding; final FlinkCalciteSqlValidator validator = - (FlinkCalciteSqlValidator) binding.getValidator(); - return validator.getExpectedOutputType(binding.getCall()).orElse(null); + (FlinkCalciteSqlValidator) callBinding.getValidator(); + final RelDataType expectedOutputType = + validator.getExpectedOutputType(callBinding.getCall()).orElse(null); + return new CallBindingCallContext( + dataTypeFactory, + definition, + callBinding, + expectedOutputType, + typeInference.getStaticArguments().orElse(null)); + } else { + return new OperatorBindingCallContext(dataTypeFactory, definition, binding, null); } - return null; } private RelDataType inferReturnTypeOrError( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlDefaultOperator.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlDefaultOperator.java index 05cf3c948ed..2898565535d 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlDefaultOperator.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlDefaultOperator.java @@ -18,6 +18,8 @@ package org.apache.flink.table.planner.functions.sql; +import org.apache.flink.table.planner.calcite.FlinkSqlCallBinding; + import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.sql.SqlCall; import org.apache.calcite.sql.SqlKind; @@ -29,7 +31,10 @@ import org.apache.calcite.sql.type.ReturnTypes; import org.apache.calcite.sql.validate.SqlValidator; import org.apache.calcite.sql.validate.SqlValidatorScope; -/** Default operator has specified type. */ +/** + * Marker for optional arguments inserted by {@link FlinkSqlCallBinding}. Compared to Calcite, this + * operator stores its type. + */ public class SqlDefaultOperator extends SqlSpecialOperator { private final RelDataType returnType; diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java index c5039da5730..8aeee6ca5c0 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java @@ -45,6 +45,7 @@ import java.util.stream.Stream; import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; import static org.apache.flink.table.annotation.ArgumentTrait.OPTIONAL_PARTITION_BY; +import static org.apache.flink.table.annotation.ArgumentTrait.PASS_COLUMNS_THROUGH; import static org.apache.flink.table.annotation.ArgumentTrait.TABLE_AS_ROW; import static org.apache.flink.table.annotation.ArgumentTrait.TABLE_AS_SET; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -134,6 +135,18 @@ public class ProcessTableFunctionTest extends TableTestBase { "SELECT * FROM f(input => TABLE t1, scalar => pojoCreator('Bob', 12), uid => 'my-ptf')"); } + @Test + void testTableAsSetPassThroughColumns() { + util.addTemporarySystemFunction("f", TableAsSetPassThroughFunction.class); + assertReachesOptimizer("SELECT * FROM f(r => TABLE t1 PARTITION BY name, i => 1)"); + } + + @Test + void testTableAsRowPassThroughColumns() { + util.addTemporarySystemFunction("f", TableAsRowPassThroughFunction.class); + assertReachesOptimizer("SELECT * FROM f(r => TABLE t1, i => 1)"); + } + @ParameterizedTest @MethodSource("errorSpecs") void testErrorBehavior(ErrorSpec spec) { @@ -259,6 +272,18 @@ public class ProcessTableFunctionTest extends TableTestBase { public void eval(@ArgumentHint({TABLE_AS_SET, OPTIONAL_PARTITION_BY}) Row r, Integer i) {} } + /** Testing function. */ + public static class TableAsRowPassThroughFunction extends ProcessTableFunction<String> { + @SuppressWarnings("unused") + public void eval(@ArgumentHint({TABLE_AS_ROW, PASS_COLUMNS_THROUGH}) Row r, Integer i) {} + } + + /** Testing function. */ + public static class TableAsSetPassThroughFunction extends ProcessTableFunction<String> { + @SuppressWarnings("unused") + public void eval(@ArgumentHint({TABLE_AS_SET, PASS_COLUMNS_THROUGH}) Row r, Integer i) {} + } + /** Testing function. */ public static class NoProcessTableFunction extends TableFunction<String> {