alexey-lv commented on code in PR #23477:
URL: https://github.com/apache/flink/pull/23477#discussion_r1341861500
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java:
##########
@@ -321,4 +332,131 @@ protected void addToSelectList(
// Always add to list
super.addToSelectList(list, aliases, fieldList, exp, scope,
includeSystemVars);
}
+
+ @Override
+ protected @PolyNull SqlNode performUnconditionalRewrites(
+ @PolyNull SqlNode node, boolean underFrom) {
+
+ // Special case for window TVFs like:
+ // TUMBLE(TABLE t, DESCRIPTOR(metadata_virtual), INTERVAL '1' MINUTE))
+ //
+ // "TABLE t" is translated into an implicit "SELECT * FROM t". This
would ignore columns
+ // that are not expanded by default. However, the descriptor
explicitly states the need
+ // for this column. Therefore, explicit table expressions (for window
TVFs at most one)
+ // are captured before rewriting and replaced with a "marker"
SqlSelect that contains the
+ // descriptor information. The "marker" SqlSelect is considered during
column expansion.
+ final List<SqlIdentifier> explicitTableArgs =
getExplicitTableOperands(node);
+
+ final SqlNode rewritten = super.performUnconditionalRewrites(node,
underFrom);
+
+ if (!(node instanceof SqlBasicCall)) {
+ return rewritten;
+ }
+ final SqlBasicCall call = (SqlBasicCall) node;
+ final SqlOperator operator = call.getOperator();
+
+ if (operator instanceof SqlWindowTableFunction) {
+ if (explicitTableArgs.stream().allMatch(Objects::isNull)) {
+ return rewritten;
+ }
+
+ final List<SqlIdentifier> descriptors =
+ call.getOperandList().stream()
+ .filter(op -> op.getKind() == SqlKind.DESCRIPTOR)
+ .flatMap(
+ desc ->
+ ((SqlBasicCall) desc)
+ .getOperandList().stream()
+
.filter(SqlIdentifier.class::isInstance)
+
.map(SqlIdentifier.class::cast))
+ .collect(Collectors.toList());
+
+ for (int i = 0; i < call.operandCount(); i++) {
+ final SqlIdentifier tableArg = explicitTableArgs.get(i);
+ if (tableArg != null) {
+ call.setOperand(i, new ExplicitTableSqlSelect(tableArg,
descriptors));
+ }
+ }
+ }
+
+ return rewritten;
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Column expansion
+ //
--------------------------------------------------------------------------------------------
+
+ /**
+ * A special {@link SqlSelect} to capture the origin of a {@link
SqlKind#EXPLICIT_TABLE} within
+ * TVF operands.
+ */
+ private static class ExplicitTableSqlSelect extends SqlSelect {
+
+ private final List<SqlIdentifier> descriptors;
+
+ public ExplicitTableSqlSelect(SqlIdentifier table, List<SqlIdentifier>
descriptors) {
+ super(
+ SqlParserPos.ZERO,
+ null,
+ SqlNodeList.of(SqlIdentifier.star(SqlParserPos.ZERO)),
+ table,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null);
+ this.descriptors = descriptors;
+ }
+ }
+
+ /**
+ * Returns whether the given column has been declared in a {@link
SqlKind#DESCRIPTOR} next to a
+ * {@link SqlKind#EXPLICIT_TABLE} within TVF operands.
+ */
+ private static boolean declaredDescriptorColumn(SelectScope scope, Column
column) {
+ if (!(scope.getNode() instanceof ExplicitTableSqlSelect)) {
+ return false;
+ }
+ final ExplicitTableSqlSelect select = (ExplicitTableSqlSelect)
scope.getNode();
+ return select.descriptors.stream()
+ .map(SqlIdentifier::getSimple)
+ .anyMatch(id -> id.equals(column.getName()));
+ }
+
+ /**
+ * Returns all {@link SqlKind#EXPLICIT_TABLE} operands within TVF
operands. A list entry is
+ * {@code null} if the operand is not an {@link SqlKind#EXPLICIT_TABLE}.
+ */
+ private static List<SqlIdentifier> getExplicitTableOperands(SqlNode node) {
+ if (!(node instanceof SqlBasicCall)) {
+ return null;
+ }
+ final SqlBasicCall call = (SqlBasicCall) node;
+
+ if (!(call.getOperator() instanceof SqlFunction)) {
+ return null;
+ }
+ final SqlFunction function = (SqlFunction) call.getOperator();
+
+ if (function.getFunctionType() !=
SqlFunctionCategory.USER_DEFINED_TABLE_FUNCTION) {
Review Comment:
Why are you looking only at UDFs (USER_DEFINED_TABLE_FUNCTION) here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]