This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b25b57c55d9 [FLINK-33169] Consider descriptor information during
system column expansion
b25b57c55d9 is described below
commit b25b57c55d903e4fdd2b666de49c90bfbad8fa99
Author: Timo Walther <[email protected]>
AuthorDate: Fri Sep 29 15:25:17 2023 -0700
[FLINK-33169] Consider descriptor information during system column expansion
---
.../planner/calcite/FlinkCalciteSqlValidator.java | 150 ++++++++++++++++++++-
.../plan/stream/sql/ColumnExpansionTest.java | 38 ++++++
2 files changed, 182 insertions(+), 6 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
index c5acdd99f54..0b0075a4f64 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
@@ -43,15 +43,20 @@ import org.apache.calcite.sql.JoinType;
import org.apache.calcite.sql.SqlAsOperator;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlJoin;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.SqlSnapshot;
import org.apache.calcite.sql.SqlUtil;
import org.apache.calcite.sql.SqlWindowTableFunction;
+import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.validate.DelegatingScope;
import org.apache.calcite.sql.validate.IdentifierNamespace;
import org.apache.calcite.sql.validate.IdentifierSnapshotNamespace;
@@ -67,6 +72,7 @@ import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.util.Static;
import org.apache.calcite.util.TimestampString;
import org.checkerframework.checker.nullness.qual.Nullable;
+import org.checkerframework.checker.nullness.qual.PolyNull;
import java.math.BigDecimal;
import java.time.ZoneId;
@@ -74,8 +80,10 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.stream.Collectors;
import static org.apache.calcite.sql.type.SqlTypeName.DECIMAL;
import static
org.apache.flink.table.expressions.resolver.lookups.FieldReferenceLookup.includeExpandedColumn;
@@ -95,6 +103,8 @@ public final class FlinkCalciteSqlValidator extends
SqlValidatorImpl {
private final FrameworkConfig frameworkConfig;
+ private final List<ColumnExpansionStrategy> columnExpansionStrategies;
+
public FlinkCalciteSqlValidator(
SqlOperatorTable opTab,
SqlValidatorCatalogReader catalogReader,
@@ -107,6 +117,9 @@ public final class FlinkCalciteSqlValidator extends
SqlValidatorImpl {
this.relOptCluster = relOptCluster;
this.toRelContext = toRelcontext;
this.frameworkConfig = frameworkConfig;
+ this.columnExpansionStrategies =
+ ShortcutUtils.unwrapTableConfig(relOptCluster)
+
.get(TableConfigOptions.TABLE_COLUMN_EXPANSION_STRATEGY);
}
public void setExpectedOutputType(SqlNode sqlNode, RelDataType
expectedOutputType) {
@@ -145,7 +158,7 @@ public final class FlinkCalciteSqlValidator extends
SqlValidatorImpl {
SqlNode operand0 = call.operand(0);
if (operand0 instanceof SqlBasicCall
&& ((SqlBasicCall) operand0).getOperator()
- instanceof SqlWindowTableFunction) {
+ instanceof
org.apache.calcite.sql.SqlWindowTableFunction) {
return;
}
}
@@ -295,11 +308,8 @@ public final class FlinkCalciteSqlValidator extends
SqlValidatorImpl {
SqlNode exp,
SelectScope scope,
boolean includeSystemVars) {
- final List<ColumnExpansionStrategy> strategies =
- ShortcutUtils.unwrapTableConfig(relOptCluster)
-
.get(TableConfigOptions.TABLE_COLUMN_EXPANSION_STRATEGY);
// Extract column's origin to apply strategy
- if (!strategies.isEmpty() && exp instanceof SqlIdentifier) {
+ if (!columnExpansionStrategies.isEmpty() && exp instanceof
SqlIdentifier) {
final SqlQualified qualified = scope.fullyQualify((SqlIdentifier)
exp);
if (qualified.namespace != null && qualified.namespace.getTable()
!= null) {
final CatalogSchemaTable schemaTable =
@@ -309,7 +319,8 @@ public final class FlinkCalciteSqlValidator extends
SqlValidatorImpl {
final String columnName = qualified.suffix().get(0);
final Column column =
resolvedSchema.getColumn(columnName).orElse(null);
if (qualified.suffix().size() == 1 && column != null) {
- if (includeExpandedColumn(column, strategies)) {
+ if (includeExpandedColumn(column,
columnExpansionStrategies)
+ || declaredDescriptorColumn(scope, column)) {
super.addToSelectList(
list, aliases, fieldList, exp, scope,
includeSystemVars);
}
@@ -321,4 +332,131 @@ public final class FlinkCalciteSqlValidator extends
SqlValidatorImpl {
// Always add to list
super.addToSelectList(list, aliases, fieldList, exp, scope,
includeSystemVars);
}
+
+ @Override
+ protected @PolyNull SqlNode performUnconditionalRewrites(
+ @PolyNull SqlNode node, boolean underFrom) {
+
+ // Special case for window TVFs like:
+ // TUMBLE(TABLE t, DESCRIPTOR(metadata_virtual), INTERVAL '1' MINUTE))
+ //
+ // "TABLE t" is translated into an implicit "SELECT * FROM t". This
would ignore columns
+ // that are not expanded by default. However, the descriptor
explicitly states the need
+ // for this column. Therefore, explicit table expressions (for window
TVFs at most one)
+ // are captured before rewriting and replaced with a "marker"
SqlSelect that contains the
+ // descriptor information. The "marker" SqlSelect is considered during
column expansion.
+ final List<SqlIdentifier> explicitTableArgs =
getExplicitTableOperands(node);
+
+ final SqlNode rewritten = super.performUnconditionalRewrites(node,
underFrom);
+
+ if (!(node instanceof SqlBasicCall)) {
+ return rewritten;
+ }
+ final SqlBasicCall call = (SqlBasicCall) node;
+ final SqlOperator operator = call.getOperator();
+
+ if (operator instanceof SqlWindowTableFunction) {
+ if (explicitTableArgs.stream().allMatch(Objects::isNull)) {
+ return rewritten;
+ }
+
+ final List<SqlIdentifier> descriptors =
+ call.getOperandList().stream()
+ .filter(op -> op.getKind() == SqlKind.DESCRIPTOR)
+ .flatMap(
+ desc ->
+ ((SqlBasicCall) desc)
+ .getOperandList().stream()
+
.filter(SqlIdentifier.class::isInstance)
+
.map(SqlIdentifier.class::cast))
+ .collect(Collectors.toList());
+
+ for (int i = 0; i < call.operandCount(); i++) {
+ final SqlIdentifier tableArg = explicitTableArgs.get(i);
+ if (tableArg != null) {
+ call.setOperand(i, new ExplicitTableSqlSelect(tableArg,
descriptors));
+ }
+ }
+ }
+
+ return rewritten;
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Column expansion
+ //
--------------------------------------------------------------------------------------------
+
+ /**
+ * A special {@link SqlSelect} to capture the origin of a {@link
SqlKind#EXPLICIT_TABLE} within
+ * TVF operands.
+ */
+ private static class ExplicitTableSqlSelect extends SqlSelect {
+
+ private final List<SqlIdentifier> descriptors;
+
+ public ExplicitTableSqlSelect(SqlIdentifier table, List<SqlIdentifier>
descriptors) {
+ super(
+ SqlParserPos.ZERO,
+ null,
+ SqlNodeList.of(SqlIdentifier.star(SqlParserPos.ZERO)),
+ table,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null);
+ this.descriptors = descriptors;
+ }
+ }
+
+ /**
+ * Returns whether the given column has been declared in a {@link
SqlKind#DESCRIPTOR} next to a
+ * {@link SqlKind#EXPLICIT_TABLE} within TVF operands.
+ */
+ private static boolean declaredDescriptorColumn(SelectScope scope, Column
column) {
+ if (!(scope.getNode() instanceof ExplicitTableSqlSelect)) {
+ return false;
+ }
+ final ExplicitTableSqlSelect select = (ExplicitTableSqlSelect)
scope.getNode();
+ return select.descriptors.stream()
+ .map(SqlIdentifier::getSimple)
+ .anyMatch(id -> id.equals(column.getName()));
+ }
+
+ /**
+ * Returns all {@link SqlKind#EXPLICIT_TABLE} operands within TVF
operands. A list entry is
+ * {@code null} if the operand is not an {@link SqlKind#EXPLICIT_TABLE}.
+ */
+ private static List<SqlIdentifier> getExplicitTableOperands(SqlNode node) {
+ if (!(node instanceof SqlBasicCall)) {
+ return null;
+ }
+ final SqlBasicCall call = (SqlBasicCall) node;
+
+ if (!(call.getOperator() instanceof SqlFunction)) {
+ return null;
+ }
+ final SqlFunction function = (SqlFunction) call.getOperator();
+
+ if (function.getFunctionType() !=
SqlFunctionCategory.USER_DEFINED_TABLE_FUNCTION) {
+ return null;
+ }
+
+ return call.getOperandList().stream()
+ .map(
+ op -> {
+ if (op.getKind() == SqlKind.EXPLICIT_TABLE) {
+ final SqlBasicCall opCall = (SqlBasicCall) op;
+ if (opCall.operandCount() == 1
+ && opCall.operand(0) instanceof
SqlIdentifier) {
+ return (SqlIdentifier) opCall.operand(0);
+ }
+ }
+ return null;
+ })
+ .collect(Collectors.toList());
+ }
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java
index 43cccf6e7f5..290b981583e 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ColumnExpansionTest.java
@@ -68,6 +68,17 @@ public class ColumnExpansionTest {
+ " 'readable-metadata' =
't2_m_virtual:INT,k1:STRING,t2_m_default:INT,k2:STRING'\n"
+ ")");
+ tableEnv.executeSql(
+ "CREATE TABLE t3 (\n"
+ + " t3_s STRING,\n"
+ + " t3_i INT,\n"
+ + " t3_m_virtual TIMESTAMP_LTZ(3) METADATA VIRTUAL,\n"
+ + " WATERMARK FOR t3_m_virtual AS t3_m_virtual -
INTERVAL '1' SECOND\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'readable-metadata' =
't3_m_virtual:TIMESTAMP_LTZ(3)'\n"
+ + ")");
+
tableEnv.getConfig().set(TABLE_COLUMN_EXPANSION_STRATEGY,
Collections.emptyList());
}
@@ -222,6 +233,33 @@ public class ColumnExpansionTest {
assertColumnNames("SELECT * FROM v1", "t1_i", "t1_s", "t1_m_default",
"t1_m_aliased");
}
+ @Test
+ public void testExplicitTableWithinTableFunction() {
+ tableEnv.getConfig()
+ .set(
+ TABLE_COLUMN_EXPANSION_STRATEGY,
+
Collections.singletonList(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS));
+
+ // t3_m_virtual is selected due to expansion of the explicit table
expression
+ // with hints from descriptor
+ assertColumnNames(
+ "SELECT * FROM TABLE(TUMBLE(TABLE t3,
DESCRIPTOR(t3_m_virtual), INTERVAL '1' MINUTE))",
+ "t3_s",
+ "t3_i",
+ "t3_m_virtual",
+ "window_start",
+ "window_end",
+ "window_time");
+
+ // Test common window TVF syntax
+ assertColumnNames(
+ "SELECT t3_s, SUM(t3_i) AS agg "
+ + "FROM TABLE(TUMBLE(TABLE t3,
DESCRIPTOR(t3_m_virtual), INTERVAL '1' MINUTE)) "
+ + "GROUP BY t3_s, window_start, window_end",
+ "t3_s",
+ "agg");
+ }
+
private void assertColumnNames(String sql, String... columnNames) {
assertThat(tableEnv.sqlQuery(sql).getResolvedSchema().getColumnNames())
.containsExactly(columnNames);