This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e622205d0b7 [FLINK-34476][table-planner] Consider assignment operator
during TVF column expansion
e622205d0b7 is described below
commit e622205d0b74c6cbaf6fef6d8c11a397cdc30284
Author: Timo Walther <[email protected]>
AuthorDate: Thu Feb 22 10:56:36 2024 +0100
[FLINK-34476][table-planner] Consider assignment operator during TVF column
expansion
---
.../planner/calcite/FlinkCalciteSqlValidator.java | 63 +++++++++++++++-------
.../plan/stream/sql/ColumnExpansionTest.java | 28 ++++++++++
2 files changed, 72 insertions(+), 19 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
index a8cd9265e69..f091ab3e70a 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
@@ -42,6 +42,7 @@ import org.apache.calcite.schema.SchemaVersion;
import org.apache.calcite.sql.JoinType;
import org.apache.calcite.sql.SqlAsOperator;
import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlIdentifier;
@@ -85,6 +86,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.apache.calcite.sql.type.SqlTypeName.DECIMAL;
import static
org.apache.flink.table.expressions.resolver.lookups.FieldReferenceLookup.includeExpandedColumn;
@@ -363,20 +365,23 @@ public final class FlinkCalciteSqlValidator extends
SqlValidatorImpl {
final List<SqlIdentifier> descriptors =
call.getOperandList().stream()
- .filter(op -> op.getKind() == SqlKind.DESCRIPTOR)
- .flatMap(
- desc ->
- ((SqlBasicCall) desc)
- .getOperandList().stream()
-
.filter(SqlIdentifier.class::isInstance)
-
.map(SqlIdentifier.class::cast))
+
.flatMap(FlinkCalciteSqlValidator::extractDescriptors)
.collect(Collectors.toList());
for (int i = 0; i < call.operandCount(); i++) {
final SqlIdentifier tableArg = explicitTableArgs.get(i);
if (tableArg != null) {
- call.setOperand(i, new ExplicitTableSqlSelect(tableArg,
descriptors));
+ final SqlNode opReplacement = new
ExplicitTableSqlSelect(tableArg, descriptors);
+ if (call.operand(i).getKind() ==
SqlKind.ARGUMENT_ASSIGNMENT) {
+ // for TUMBLE(DATA => TABLE t3, ...)
+ final SqlCall assignment = call.operand(i);
+ assignment.setOperand(0, opReplacement);
+ } else {
+ // for TUMBLE(TABLE t3, ...)
+ call.setOperand(i, opReplacement);
+ }
}
+ // for TUMBLE([DATA =>] SELECT ..., ...)
}
}
@@ -447,20 +452,40 @@ public final class FlinkCalciteSqlValidator extends
SqlValidatorImpl {
}
return call.getOperandList().stream()
- .map(
- op -> {
- if (op.getKind() == SqlKind.EXPLICIT_TABLE) {
- final SqlBasicCall opCall = (SqlBasicCall) op;
- if (opCall.operandCount() == 1
- && opCall.operand(0) instanceof
SqlIdentifier) {
- return (SqlIdentifier) opCall.operand(0);
- }
- }
- return null;
- })
+ .map(FlinkCalciteSqlValidator::extractExplicitTable)
.collect(Collectors.toList());
}
+ private static @Nullable SqlIdentifier extractExplicitTable(SqlNode op) {
+ if (op.getKind() == SqlKind.EXPLICIT_TABLE) {
+ final SqlBasicCall opCall = (SqlBasicCall) op;
+ if (opCall.operandCount() == 1 && opCall.operand(0) instanceof
SqlIdentifier) {
+ // for TUMBLE(TABLE t3, ...)
+ return opCall.operand(0);
+ }
+ } else if (op.getKind() == SqlKind.ARGUMENT_ASSIGNMENT) {
+ // for TUMBLE(DATA => TABLE t3, ...)
+ final SqlBasicCall opCall = (SqlBasicCall) op;
+ return extractExplicitTable(opCall.operand(0));
+ }
+ return null;
+ }
+
+ private static Stream<SqlIdentifier> extractDescriptors(SqlNode op) {
+ if (op.getKind() == SqlKind.DESCRIPTOR) {
+ // for TUMBLE(..., DESCRIPTOR(col), ...)
+ final SqlBasicCall opCall = (SqlBasicCall) op;
+ return opCall.getOperandList().stream()
+ .filter(SqlIdentifier.class::isInstance)
+ .map(SqlIdentifier.class::cast);
+ } else if (op.getKind() == SqlKind.ARGUMENT_ASSIGNMENT) {
+ // for TUMBLE(..., TIMECOL => DESCRIPTOR(col), ...)
+ final SqlBasicCall opCall = (SqlBasicCall) op;
+ return extractDescriptors(opCall.operand(0));
+ }
+ return Stream.empty();
+ }
+
private static boolean isTableFunction(SqlFunction function) {
return function instanceof SqlTableFunction
|| function.getFunctionType() ==
SqlFunctionCategory.USER_DEFINED_TABLE_FUNCTION;
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java
index 7062d7ddd4a..f51da37fea8 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java
@@ -289,4 +289,32 @@ class ColumnExpansionTest {
assertThat(tableEnv.sqlQuery(sql).getResolvedSchema().getColumnNames())
.containsExactly(columnNames);
}
+
+ @Test
+ void testExplicitTableWithinTableFunctionWithNamedArgs() {
+ tableEnv.getConfig()
+ .set(
+ TABLE_COLUMN_EXPANSION_STRATEGY,
+
Collections.singletonList(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS));
+
+ // t3_m_virtual is selected due to expansion of the explicit table
expression
+ // with hints from descriptor
+ assertColumnNames(
+ "SELECT * FROM TABLE("
+ + "TUMBLE(DATA => TABLE t3, TIMECOL =>
DESCRIPTOR(t3_m_virtual), SIZE => INTERVAL '1' MINUTE))",
+ "t3_s",
+ "t3_i",
+ "t3_m_virtual",
+ "window_start",
+ "window_end",
+ "window_time");
+
+ // Test common window TVF syntax
+ assertColumnNames(
+ "SELECT t3_s, SUM(t3_i) AS agg "
+ + "FROM TABLE(TUMBLE(DATA => TABLE t3, TIMECOL =>
DESCRIPTOR(t3_m_virtual), SIZE => INTERVAL '1' MINUTE)) "
+ + "GROUP BY t3_s, window_start, window_end",
+ "t3_s",
+ "agg");
+ }
}