This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ccfd8be19ba [FLINK-31836][table] Upgrade Calcite version to 1.34.0
ccfd8be19ba is described below
commit ccfd8be19ba1d38e5cd6fb5c7e18dc07019375b5
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Mon Nov 25 12:53:56 2024 +0100
[FLINK-31836][table] Upgrade Calcite version to 1.34.0
---
.../docs/dev/table/sql/queries/deduplication.md | 8 +-
docs/content.zh/docs/dev/table/sql/queries/topn.md | 12 +-
.../docs/dev/table/concepts/versioned_tables.md | 8 +-
.../docs/dev/table/sql/queries/deduplication.md | 8 +-
docs/content/docs/dev/table/sql/queries/topn.md | 12 +-
flink-table/flink-sql-parser/pom.xml | 4 +-
.../src/main/codegen/data/Parser.tdd | 2 +
.../src/main/codegen/templates/Parser.jj | 148 +++++++++++-
flink-table/flink-table-calcite-bridge/pom.xml | 8 +-
.../calcite/sql/validate/SqlValidatorImpl.java | 253 +++++++++++++++------
.../apache/calcite/sql2rel/RelDecorrelator.java | 20 +-
.../apache/calcite/sql2rel/SqlToRelConverter.java | 176 +++++++++++---
.../calcite/sql2rel/StandardConvertletTable.java | 123 +++++++---
.../java/org/apache/calcite/tools/RelBuilder.java | 2 +-
.../src/main/resources/META-INF/NOTICE | 4 +-
.../runtime/batch/sql/OverAggregateITCase.scala | 42 ++++
.../planner/runtime/batch/sql/RankITCase.scala | 19 ++
flink-table/pom.xml | 4 +-
18 files changed, 663 insertions(+), 190 deletions(-)
diff --git a/docs/content.zh/docs/dev/table/sql/queries/deduplication.md
b/docs/content.zh/docs/dev/table/sql/queries/deduplication.md
index b81c49213c4..3368f0ced5f 100644
--- a/docs/content.zh/docs/dev/table/sql/queries/deduplication.md
+++ b/docs/content.zh/docs/dev/table/sql/queries/deduplication.md
@@ -33,12 +33,8 @@ Flink 使用 `ROW_NUMBER()` 去除重复数据,就像 Top-N 查询一样。其
```sql
SELECT [column_list]
-FROM (
- SELECT [column_list],
- ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
- ORDER BY time_attr [asc|desc]) AS rownum
- FROM table_name)
-WHERE rownum = 1
+FROM table_name
+QUALIFY ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] ORDER BY time_attr
[asc|desc]) = 1
```
**参数说明:**
diff --git a/docs/content.zh/docs/dev/table/sql/queries/topn.md
b/docs/content.zh/docs/dev/table/sql/queries/topn.md
index 7339ca3b590..131895597c9 100644
--- a/docs/content.zh/docs/dev/table/sql/queries/topn.md
+++ b/docs/content.zh/docs/dev/table/sql/queries/topn.md
@@ -32,13 +32,11 @@ Flink 使用 `OVER` 窗口子句和过滤条件的组合来表达一个 Top-N
下面展示了 Top-N 的语法:
```sql
-SELECT [column_list]
-FROM (
- SELECT [column_list],
- ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
- ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
- FROM table_name)
-WHERE rownum <= N [AND conditions]
+SELECT [column_list],
+ ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] ORDER BY time_attr
[asc|desc]) AS rownum
+FROM table_name
+QUALIFY rownum <= N
+[WHERE conditions]
```
**参数说明:**
diff --git a/docs/content/docs/dev/table/concepts/versioned_tables.md
b/docs/content/docs/dev/table/concepts/versioned_tables.md
index 4cf494898c3..d42bb110eca 100644
--- a/docs/content/docs/dev/table/concepts/versioned_tables.md
+++ b/docs/content/docs/dev/table/concepts/versioned_tables.md
@@ -164,12 +164,8 @@ In general, the results of a query with the following
format produces a versione
```sql
SELECT [column_list]
-FROM (
- SELECT [column_list],
- ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
- ORDER BY time_attr DESC) AS rownum
- FROM table_name)
-WHERE rownum = 1
+FROM table_name
+QUALIFY ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] ORDER BY time_attr
[asc|desc]) = 1
```
**Parameter Specification:**
diff --git a/docs/content/docs/dev/table/sql/queries/deduplication.md
b/docs/content/docs/dev/table/sql/queries/deduplication.md
index 18b517e6e56..08a02c1c8ba 100644
--- a/docs/content/docs/dev/table/sql/queries/deduplication.md
+++ b/docs/content/docs/dev/table/sql/queries/deduplication.md
@@ -33,12 +33,8 @@ The following shows the syntax of the Deduplication
statement:
```sql
SELECT [column_list]
-FROM (
- SELECT [column_list],
- ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
- ORDER BY time_attr [asc|desc]) AS rownum
- FROM table_name)
-WHERE rownum = 1
+FROM table_name
+QUALIFY ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] ORDER BY time_attr
[asc|desc]) = 1
```
**Parameter Specification:**
diff --git a/docs/content/docs/dev/table/sql/queries/topn.md
b/docs/content/docs/dev/table/sql/queries/topn.md
index d4652b7a1f7..e374c3cca4e 100644
--- a/docs/content/docs/dev/table/sql/queries/topn.md
+++ b/docs/content/docs/dev/table/sql/queries/topn.md
@@ -33,13 +33,11 @@ Flink uses the combination of a OVER window clause and a
filter condition to exp
The following shows the syntax of the Top-N statement:
```sql
-SELECT [column_list]
-FROM (
- SELECT [column_list],
- ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
- ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
- FROM table_name)
-WHERE rownum <= N [AND conditions]
+SELECT [column_list],
+ ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] ORDER BY time_attr
[asc|desc]) AS rownum
+FROM table_name
+QUALIFY rownum <= N
+[WHERE conditions]
```
**Parameter Specification:**
diff --git a/flink-table/flink-sql-parser/pom.xml
b/flink-table/flink-sql-parser/pom.xml
index 4e7c463db78..866fa4f97e3 100644
--- a/flink-table/flink-sql-parser/pom.xml
+++ b/flink-table/flink-sql-parser/pom.xml
@@ -62,9 +62,9 @@ under the License.
<version>${calcite.version}</version>
<exclusions>
<!--
- "mvn dependency:tree" as of Calcite 1.33.0:
+ "mvn dependency:tree" as of Calcite 1.34.0:
- [INFO] +-
org.apache.calcite:calcite-core:jar:1.33.0:compile
+ [INFO] +-
org.apache.calcite:calcite-core:jar:1.34.0:compile
[INFO] | +-
org.apache.calcite.avatica:avatica-core:jar:1.23.0:compile
[INFO] | +-
org.apiguardian:apiguardian-api:jar:1.1.2:compile
[INFO] | +-
org.checkerframework:checker-qual:jar:3.12.0:compile
diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index 2a366116e93..ff10d47109a 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -287,6 +287,8 @@
"CURSOR_NAME"
"DATA"
"DATABASE"
+ "DATE_DIFF"
+ "DATETIME_DIFF"
"DATETIME_INTERVAL_CODE"
"DATETIME_INTERVAL_PRECISION"
"DAYS"
diff --git a/flink-table/flink-sql-parser/src/main/codegen/templates/Parser.jj
b/flink-table/flink-sql-parser/src/main/codegen/templates/Parser.jj
index 57c5eec2373..9de00ec3dcb 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/templates/Parser.jj
+++ b/flink-table/flink-sql-parser/src/main/codegen/templates/Parser.jj
@@ -1311,6 +1311,7 @@ SqlSelect SqlSelect() :
final SqlNodeList groupBy;
final SqlNode having;
final SqlNodeList windowDecls;
+ final SqlNode qualify;
final List<SqlNode> hints = new ArrayList<SqlNode>();
final Span s;
}
@@ -1337,6 +1338,7 @@ SqlSelect SqlSelect() :
( groupBy = GroupBy() | { groupBy = null; } )
( having = Having() | { having = null; } )
( windowDecls = Window() | { windowDecls = null; } )
+ ( qualify = Qualify() | { qualify = null; } )
|
E() {
fromClause = null;
@@ -1344,13 +1346,14 @@ SqlSelect SqlSelect() :
groupBy = null;
having = null;
windowDecls = null;
+ qualify = null;
}
)
{
return new SqlSelect(s.end(this), keywordList,
new SqlNodeList(selectList, Span.of(selectList).pos()),
- fromClause, where, groupBy, having, windowDecls, null, null, null,
- new SqlNodeList(hints, getPos()));
+ fromClause, where, groupBy, having, windowDecls, qualify,
+ null, null, null, new SqlNodeList(hints, getPos()));
}
}
@@ -2773,6 +2776,15 @@ SqlNode WindowRange() :
)
}
+/** Parses a QUALIFY clause for SELECT. */
+SqlNode Qualify() :
+{
+ SqlNode e;
+}
+{
+ <QUALIFY> e = Expression(ExprContext.ACCEPT_SUB_QUERY) { return e; }
+}
+
/**
* Parses an ORDER BY clause.
*/
@@ -4033,6 +4045,7 @@ SqlNode AtomicRowExpression() :
}
{
(
+ LOOKAHEAD(2)
e = LiteralOrIntervalExpression()
|
e = DynamicParam()
@@ -4629,6 +4642,28 @@ SqlLiteral DateTimeLiteral() :
}
}
+/** Parses a Date/Time constructor function, for example "DATE(1969, 7, 21)"
+ * or "DATETIME(d, t)". Enabled in some libraries (e.g. BigQuery). */
+SqlNode DateTimeConstructorCall() :
+{
+ final SqlFunctionCategory funcType = SqlFunctionCategory.TIMEDATE;
+ final SqlIdentifier qualifiedName;
+ final Span s;
+ final SqlLiteral quantifier;
+ final List<? extends SqlNode> args;
+}
+{
+ (<DATE> | <TIME> | <DATETIME> | <TIMESTAMP>) {
+ s = span();
+ qualifiedName = new SqlIdentifier(unquotedIdentifier(), getPos());
+ }
+ args = FunctionParameterList(ExprContext.ACCEPT_SUB_QUERY) {
+ quantifier = (SqlLiteral) args.get(0);
+ args.remove(0);
+ return createCall(qualifiedName, s.end(this), funcType, quantifier,
args);
+ }
+}
+
/** Parses a MULTISET constructor */
SqlNode MultisetConstructor() :
{
@@ -5047,13 +5082,42 @@ SqlIntervalQualifier IntervalQualifierStart() :
* registered as abbreviations in your time frame set.
*/
SqlIntervalQualifier TimeUnitOrName() : {
- final Span span;
- final String w;
- final TimeUnit unit;
final SqlIdentifier unitName;
+ final SqlIntervalQualifier intervalQualifier;
}
{
+ // When we see a time unit that is also a non-reserved keyword, such as
+ // NANOSECOND, there is a choice between using the TimeUnit enum
+ // (TimeUnit.NANOSECOND) or the name. The following LOOKAHEAD directive
+ // tells the parser that we prefer the former.
+ //
+ // Reserved keywords, such as SECOND, cannot be identifiers, and are
+ // therefore not ambiguous.
LOOKAHEAD(2)
+ intervalQualifier = TimeUnit() {
+ return intervalQualifier;
+ }
+| unitName = SimpleIdentifier() {
+ return new SqlIntervalQualifier(unitName.getSimple(),
+ unitName.getParserPosition());
+ }
+}
+
+/** Parses a built-in time unit (e.g. "YEAR")
+ * and returns a {@link SqlIntervalQualifier}.
+ *
+ * <p>Includes {@code WEEK} and {@code WEEK(SUNDAY)} through
+ {@code WEEK(SATURDAY)}.
+ *
+ * <p>Does not include SQL_TSI_DAY, SQL_TSI_FRAC_SECOND etc. These will be
+ * parsed as identifiers and can be resolved in the validator if they are
+ * registered as abbreviations in your time frame set.
+ */
+SqlIntervalQualifier TimeUnit() : {
+ final Span span;
+ final String w;
+}
+{
<NANOSECOND> { return new SqlIntervalQualifier(TimeUnit.NANOSECOND, null,
getPos()); }
| <MICROSECOND> { return new SqlIntervalQualifier(TimeUnit.MICROSECOND,
null, getPos()); }
| <MILLISECOND> { return new SqlIntervalQualifier(TimeUnit.MILLISECOND,
null, getPos()); }
@@ -5067,6 +5131,8 @@ SqlIntervalQualifier TimeUnitOrName() : {
| <ISOYEAR> { return new SqlIntervalQualifier(TimeUnit.ISOYEAR, null,
getPos()); }
| <WEEK> { span = span(); }
(
+ // There is a choice between "WEEK(weekday)" and "WEEK". We prefer
+ // the former, and the parser will look ahead for '('.
LOOKAHEAD(2)
<LPAREN> w = weekdayName() <RPAREN> {
return new SqlIntervalQualifier(w, span.end(this));
@@ -5081,10 +5147,6 @@ SqlIntervalQualifier TimeUnitOrName() : {
| <DECADE> { return new SqlIntervalQualifier(TimeUnit.DECADE, null,
getPos()); }
| <CENTURY> { return new SqlIntervalQualifier(TimeUnit.CENTURY, null,
getPos()); }
| <MILLENNIUM> { return new SqlIntervalQualifier(TimeUnit.MILLENNIUM, null,
getPos()); }
-| unitName = SimpleIdentifier() {
- return new SqlIntervalQualifier(unitName.getSimple(),
- unitName.getParserPosition());
- }
}
String weekdayName() :
@@ -6085,10 +6147,16 @@ SqlNode BuiltinFunctionCall() :
<RPAREN> {
return SqlStdOperatorTable.TRIM.createCall(s.end(this), args);
}
+ |
+ node = DateTimeConstructorCall() { return node; }
+ |
+ node = DateDiffFunctionCall() { return node; }
|
node = DateTruncFunctionCall() { return node; }
|
node = TimestampAddFunctionCall() { return node; }
+ |
+ node = DatetimeDiffFunctionCall() { return node; }
|
node = TimestampDiffFunctionCall() { return node; }
|
@@ -6603,6 +6671,28 @@ SqlCall JsonArrayAggFunctionCall() :
}
}
+/**
+ * Parses a call to BigQuery's DATE_DIFF.
+ */
+SqlCall DateDiffFunctionCall() :
+{
+ final List<SqlNode> args = new ArrayList<SqlNode>();
+ final Span s;
+ final SqlIntervalQualifier unit;
+}
+{
+ <DATE_DIFF> { s = span(); }
+ <LPAREN>
+ AddExpression(args, ExprContext.ACCEPT_SUB_QUERY)
+ <COMMA>
+ AddExpression(args, ExprContext.ACCEPT_SUB_QUERY)
+ <COMMA>
+ unit = TimeUnitOrName() { args.add(unit); }
+ <RPAREN> {
+ return SqlLibraryOperators.DATE_DIFF.createCall(s.end(this), args);
+ }
+}
+
/**
* Parses a call to TIMESTAMPADD.
*/
@@ -6676,6 +6766,28 @@ SqlCall TimestampDiff3FunctionCall() :
}
}
+/**
+ * Parses BigQuery's built-in DATETIME_DIFF() function.
+ */
+SqlCall DatetimeDiffFunctionCall() :
+{
+ final List<SqlNode> args = new ArrayList<SqlNode>();
+ final Span s;
+ final SqlIntervalQualifier unit;
+}
+{
+ <DATETIME_DIFF> { s = span(); }
+ <LPAREN>
+ AddExpression(args, ExprContext.ACCEPT_SUB_QUERY)
+ <COMMA>
+ AddExpression(args, ExprContext.ACCEPT_SUB_QUERY)
+ <COMMA>
+ unit = TimeUnitOrName() { args.add(unit); }
+ <RPAREN> {
+ return SqlLibraryOperators.DATETIME_DIFF.createCall(s.end(this), args);
+ }
+}
+
/**
* Parses a call to DATE_TRUNC.
*/
@@ -6694,7 +6806,8 @@ SqlCall DateTruncFunctionCall() :
// the BigQuery variant, e.g. "DATE_TRUNC(d, YEAR)",
// and the Redshift variant, e.g. "DATE_TRUNC('year', DATE '2008-09-08')".
(
- unit = TimeUnitOrName() { args.add(unit); }
+ LOOKAHEAD(2)
+ unit = TimeUnit() { args.add(unit); }
|
AddExpression(args, ExprContext.ACCEPT_SUB_QUERY)
)
@@ -7252,6 +7365,18 @@ SqlNode JdbcFunctionCall() :
s = span();
}
(
+ LOOKAHEAD(1)
+ call = DateDiffFunctionCall() {
+ name = call.getOperator().getName();
+ args = new SqlNodeList(call.getOperandList(), getPos());
+ }
+ |
+ LOOKAHEAD(1)
+ call = DatetimeDiffFunctionCall() {
+ name = call.getOperator().getName();
+ args = new SqlNodeList(call.getOperandList(), getPos());
+ }
+ |
LOOKAHEAD(1)
call = TimestampAddFunctionCall() {
name = call.getOperator().getName();
@@ -7658,8 +7783,10 @@ SqlPostfixOperator PostfixRowOperator() :
| < DATE: "DATE" >
| < DATE_TRUNC: "DATE_TRUNC" >
| < DATETIME: "DATETIME" >
+| < DATETIME_DIFF: "DATETIME_DIFF" >
| < DATETIME_INTERVAL_CODE: "DATETIME_INTERVAL_CODE" >
| < DATETIME_INTERVAL_PRECISION: "DATETIME_INTERVAL_PRECISION" >
+| < DATE_DIFF: "DATE_DIFF" >
| < DAY: "DAY" >
| < DAYS: "DAYS" >
| < DEALLOCATE: "DEALLOCATE" >
@@ -7958,6 +8085,7 @@ SqlPostfixOperator PostfixRowOperator() :
| < PRIVILEGES: "PRIVILEGES" >
| < PROCEDURE: "PROCEDURE" >
| < PUBLIC: "PUBLIC" >
+| < QUALIFY: "QUALIFY" >
| < QUARTER: "QUARTER" >
| < QUARTERS: "QUARTERS" >
| < RANGE: "RANGE" >
diff --git a/flink-table/flink-table-calcite-bridge/pom.xml
b/flink-table/flink-table-calcite-bridge/pom.xml
index 3c57ecbaab1..9b081d688ed 100644
--- a/flink-table/flink-table-calcite-bridge/pom.xml
+++ b/flink-table/flink-table-calcite-bridge/pom.xml
@@ -45,9 +45,9 @@ under the License.
<version>${calcite.version}</version>
<exclusions>
<!--
- "mvn dependency:tree" as of Calcite 1.33.0:
- [INFO] +-
org.apache.calcite:calcite-core:jar:1.33.0:compile
- [INFO] | +-
org.apache.calcite:calcite-linq4j:jar:1.33.0:compile
+ "mvn dependency:tree" as of Calcite 1.34.0:
+ [INFO] +-
org.apache.calcite:calcite-core:jar:1.34.0:compile
+ [INFO] | +-
org.apache.calcite:calcite-linq4j:jar:1.34.0:compile
[INFO] | +-
org.locationtech.jts:jts-core:jar:1.19.0:compile
[INFO] | +-
com.fasterxml.jackson.core:jackson-annotations:jar:2.15.3:compile
[INFO] | +-
org.apache.calcite.avatica:avatica-core:jar:1.23.0:compile
@@ -60,7 +60,7 @@ under the License.
[INFO] | | \-
org.ow2.asm:asm:jar:9.1:runtime
[INFO] | +-
commons-codec:commons-codec:jar:1.15:runtime
[INFO] | +-
org.apache.commons:commons-math3:jar:3.6.1:runtime
- [INFO] | \-
commons-io:commons-io:jar:2.11.0:runtime
+ [INFO] | \-
commons-io:commons-io:jar:2.15.1:runtime
Dependencies that are not needed for how we use
Calcite right now.
-->
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
index 097a9f2316f..d878af1c7f7 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
@@ -499,9 +499,12 @@ public class SqlValidatorImpl implements
SqlValidatorWithHints {
inferUnknownTypes(targetType, selectScope, expanded);
RelDataType type = deriveType(selectScope, expanded);
- // Re-derive SELECT ITEM's data type that may be nullable in
AggregatingSelectScope when it
- // appears in advanced grouping elements such as CUBE, ROLLUP ,
GROUPING SETS.
- // For example, SELECT CASE WHEN c = 1 THEN '1' ELSE '23' END AS x
FROM t GROUP BY CUBE(x),
+ // Re-derive SELECT ITEM's data type that may be nullable in
+ // AggregatingSelectScope when it appears in advanced grouping
elements such
+ // as CUBE, ROLLUP, GROUPING SETS. For example, in
+ // SELECT CASE WHEN c = 1 THEN '1' ELSE '23' END AS x
+ // FROM t
+ // GROUP BY CUBE(x)
// the 'x' should be nullable even if x's literal values are not null.
if (selectScope instanceof AggregatingSelectScope) {
type = requireNonNull(selectScope.nullifyType(stripAs(expanded),
type));
@@ -1459,6 +1462,7 @@ public class SqlValidatorImpl implements
SqlValidatorWithHints {
null,
null,
null,
+ null,
orderList,
orderBy.offset,
orderBy.fetch,
@@ -1483,6 +1487,7 @@ public class SqlValidatorImpl implements
SqlValidatorWithHints {
null,
null,
null,
+ null,
null);
}
@@ -1590,6 +1595,7 @@ public class SqlValidatorImpl implements
SqlValidatorWithHints {
null,
null,
null,
+ null,
null);
call.setSourceSelect(select);
@@ -1616,6 +1622,7 @@ public class SqlValidatorImpl implements
SqlValidatorWithHints {
null,
null,
null,
+ null,
null);
insertCall.setSource(select);
}
@@ -1678,6 +1685,7 @@ public class SqlValidatorImpl implements
SqlValidatorWithHints {
null,
null,
null,
+ null,
null);
source = SqlValidatorUtil.addAlias(source, UPDATE_SRC_ALIAS);
SqlMerge mergeCall =
@@ -1744,6 +1752,7 @@ public class SqlValidatorImpl implements
SqlValidatorWithHints {
null,
null,
null,
+ null,
null);
}
@@ -1774,6 +1783,7 @@ public class SqlValidatorImpl implements
SqlValidatorWithHints {
null,
null,
null,
+ null,
null);
}
@@ -2785,6 +2795,9 @@ public class SqlValidatorImpl implements
SqlValidatorWithHints {
clauseScopes.put(IdPair.of(select, Clause.WHERE), selectScope);
registerOperandSubQueries(selectScope, select,
SqlSelect.WHERE_OPERAND);
+ // Register subqueries in the QUALIFY clause
+ registerOperandSubQueries(selectScope, select,
SqlSelect.QUALIFY_OPERAND);
+
// Register FROM with the inherited scope 'parentScope', not
// 'selectScope', otherwise tables in the FROM clause would be
// able to see each other.
@@ -3599,12 +3612,11 @@ public class SqlValidatorImpl implements
SqlValidatorWithHints {
String name = id.names.get(0);
SqlNameMatcher nameMatcher = getCatalogReader().nameMatcher();
RelDataType rowType = getNamespaceOrThrow(node).getRowType();
- RelDataType colType =
+ final RelDataTypeField field =
requireNonNull(
- nameMatcher.field(rowType, name),
- () -> "unable to find left field " + name + "
in " + rowType)
- .getType();
- return colType;
+ nameMatcher.field(rowType, name),
+ () -> "unable to find left field " + name + " in " +
rowType);
+ return field.getType();
}
/**
@@ -3710,6 +3722,7 @@ public class SqlValidatorImpl implements
SqlValidatorWithHints {
validateGroupClause(select);
validateHavingClause(select);
validateWindowClause(select);
+ validateQualifyClause(select);
handleOffsetFetch(select.getOffset(), select.getFetch());
// Validate the SELECT clause late, because a select item might
@@ -4158,6 +4171,34 @@ public class SqlValidatorImpl implements
SqlValidatorWithHints {
windowList.validate(this, windowScope);
}
+ protected void validateQualifyClause(SqlSelect select) {
+ SqlNode qualifyNode = select.getQualify();
+ if (qualifyNode == null) {
+ return;
+ }
+
+ SqlValidatorScope qualifyScope = getSelectScope(select);
+
+ qualifyNode = extendedExpand(qualifyNode, qualifyScope, select,
Clause.QUALIFY);
+ select.setQualify(qualifyNode);
+
+ inferUnknownTypes(booleanType, qualifyScope, qualifyNode);
+
+ qualifyNode.validate(this, qualifyScope);
+
+ final RelDataType type = deriveType(qualifyScope, qualifyNode);
+ if (!SqlTypeUtil.inBooleanFamily(type)) {
+ throw newValidationError(qualifyNode,
RESOURCE.condMustBeBoolean("QUALIFY"));
+ }
+
+ boolean qualifyContainsWindowFunction =
overFinder.findAgg(qualifyNode) != null;
+ if (!qualifyContainsWindowFunction) {
+ throw newValidationError(
+ qualifyNode,
+
RESOURCE.qualifyExpressionMustContainWindowFunction(qualifyNode.toString()));
+ }
+ }
+
@Override
public void validateWith(SqlWith with, SqlValidatorScope scope) {
final SqlValidatorNamespace namespace = getNamespaceOrThrow(with);
@@ -4349,7 +4390,7 @@ public class SqlValidatorImpl implements
SqlValidatorWithHints {
// expand the expression in group list.
List<SqlNode> expandedList = new ArrayList<>();
for (SqlNode groupItem : groupList) {
- SqlNode expandedItem = expandGroupByOrHavingExpr(groupItem,
groupScope, select, false);
+ SqlNode expandedItem = extendedExpand(groupItem, groupScope,
select, Clause.GROUP_BY);
expandedList.add(expandedItem);
}
groupList = new SqlNodeList(expandedList,
groupList.getParserPosition());
@@ -4475,7 +4516,7 @@ public class SqlValidatorImpl implements
SqlValidatorWithHints {
}
final AggregatingScope havingScope = (AggregatingScope)
getSelectScope(select);
if (config.conformance().isHavingAlias()) {
- SqlNode newExpr = expandGroupByOrHavingExpr(having, havingScope,
select, true);
+ SqlNode newExpr = extendedExpand(having, havingScope, select,
Clause.HAVING);
if (having != newExpr) {
having = newExpr;
select.setHaving(newExpr);
@@ -4981,14 +5022,13 @@ public class SqlValidatorImpl implements
SqlValidatorWithHints {
// matched
boolean isUpdateModifiableViewTable = false;
if (query instanceof SqlUpdate) {
- final SqlNodeList targetColumnList = ((SqlUpdate)
query).getTargetColumnList();
- if (targetColumnList != null) {
- final int targetColumnCnt = targetColumnList.size();
- targetRowType =
- SqlTypeUtil.extractLastNFields(typeFactory,
targetRowType, targetColumnCnt);
- sourceRowType =
- SqlTypeUtil.extractLastNFields(typeFactory,
sourceRowType, targetColumnCnt);
- }
+ final SqlNodeList targetColumnList =
+ requireNonNull(((SqlUpdate) query).getTargetColumnList());
+ final int targetColumnCount = targetColumnList.size();
+ targetRowType =
+ SqlTypeUtil.extractLastNFields(typeFactory, targetRowType,
targetColumnCount);
+ sourceRowType =
+ SqlTypeUtil.extractLastNFields(typeFactory, sourceRowType,
targetColumnCount);
isUpdateModifiableViewTable =
table.unwrap(ModifiableViewTable.class) != null;
}
if (SqlTypeUtil.equalAsStructSansNullability(
@@ -5493,13 +5533,12 @@ public class SqlValidatorImpl implements
SqlValidatorWithHints {
node.validate(this, scope);
SqlIdentifier identifier;
if (node instanceof SqlBasicCall) {
- identifier = (SqlIdentifier) ((SqlBasicCall)
node).operand(0);
+ identifier = ((SqlBasicCall) node).operand(0);
} else {
identifier =
- (SqlIdentifier)
- requireNonNull(
- node,
- () -> "order by field is null. All
fields: " + orderBy);
+ requireNonNull(
+ (SqlIdentifier) node,
+ () -> "order by field is null. All fields:
" + orderBy);
}
if (allRows) {
@@ -5721,7 +5760,7 @@ public class SqlValidatorImpl implements
SqlValidatorWithHints {
public void validatePivot(SqlPivot pivot) {
final PivotScope scope =
- (PivotScope) requireNonNull(getJoinScope(pivot), () ->
"joinScope for " + pivot);
+ requireNonNull((PivotScope) getJoinScope(pivot), () ->
"joinScope for " + pivot);
final PivotNamespace ns =
getNamespaceOrThrow(pivot).unwrap(PivotNamespace.class);
assert ns.rowType == null;
@@ -6146,9 +6185,10 @@ public class SqlValidatorImpl implements
SqlValidatorWithHints {
return newExpr;
}
- public SqlNode expandGroupByOrHavingExpr(
- SqlNode expr, SqlValidatorScope scope, SqlSelect select, boolean
havingExpression) {
- final Expander expander = new ExtendedExpander(this, scope, select,
expr, havingExpression);
+ /** Expands an expression in a GROUP BY, HAVING or QUALIFY clause. */
+ private SqlNode extendedExpand(
+ SqlNode expr, SqlValidatorScope scope, SqlSelect select, Clause
clause) {
+ final Expander expander = new ExtendedExpander(this, scope, select,
expr, clause);
SqlNode newExpr = expander.go(expr);
if (expr != newExpr) {
setOriginal(newExpr, expr);
@@ -6156,6 +6196,10 @@ public class SqlValidatorImpl implements
SqlValidatorWithHints {
return newExpr;
}
+ public SqlNode extendedExpandGroupBy(SqlNode expr, SqlValidatorScope
scope, SqlSelect select) {
+ return extendedExpand(expr, scope, select, Clause.GROUP_BY);
+ }
+
@Override
public boolean isSystemField(RelDataTypeField field) {
return false;
@@ -6770,67 +6814,74 @@ public class SqlValidatorImpl implements
SqlValidatorWithHints {
static class ExtendedExpander extends Expander {
final SqlSelect select;
final SqlNode root;
- final boolean havingExpr;
+ final Clause clause;
ExtendedExpander(
SqlValidatorImpl validator,
SqlValidatorScope scope,
SqlSelect select,
SqlNode root,
- boolean havingExpr) {
+ Clause clause) {
super(validator, scope);
this.select = select;
this.root = root;
- this.havingExpr = havingExpr;
+ this.clause = clause;
}
@Override
public @Nullable SqlNode visit(SqlIdentifier id) {
- if (id.isSimple()
- && (havingExpr
- ? validator.config().conformance().isHavingAlias()
- :
validator.config().conformance().isGroupByAlias())) {
- String name = id.getSimple();
- SqlNode expr = null;
- final SqlNameMatcher nameMatcher =
validator.catalogReader.nameMatcher();
- int n = 0;
- for (SqlNode s :
SqlNonNullableAccessors.getSelectList(select)) {
- final @Nullable String alias = SqlValidatorUtil.alias(s);
- if (alias != null && nameMatcher.matches(alias, name)) {
- expr = s;
- n++;
- }
- }
- if (n == 0) {
- return super.visit(id);
- } else if (n > 1) {
- // More than one column has this alias.
- throw validator.newValidationError(id,
RESOURCE.columnAmbiguous(name));
- }
- if (havingExpr && validator.isAggregate(root)) {
- return super.visit(id);
- }
- expr = stripAs(expr);
- if (expr instanceof SqlIdentifier) {
- SqlIdentifier sid = (SqlIdentifier) expr;
- final SqlIdentifier fqId =
getScope().fullyQualify(sid).identifier;
- expr = expandDynamicStar(sid, fqId);
- }
- return expr;
+ if (!id.isSimple()) {
+ return super.visit(id);
}
- if (id.isSimple()) {
+
+ final boolean replaceAliases =
clause.shouldReplaceAliases(validator.config);
+ if (!replaceAliases) {
final SelectScope scope = validator.getRawSelectScope(select);
SqlNode node = expandCommonColumn(select, id, scope,
validator);
if (node != id) {
return node;
}
+ return super.visit(id);
+ }
+
+ String name = id.getSimple();
+ SqlNode expr = null;
+ final SqlNameMatcher nameMatcher =
validator.catalogReader.nameMatcher();
+ int n = 0;
+ for (SqlNode s : SqlNonNullableAccessors.getSelectList(select)) {
+ final @Nullable String alias = SqlValidatorUtil.alias(s);
+ if (alias != null && nameMatcher.matches(alias, name)) {
+ expr = s;
+ n++;
+ }
+ }
+
+ if (n == 0) {
+ return super.visit(id);
+ } else if (n > 1) {
+ // More than one column has this alias.
+ throw validator.newValidationError(id,
RESOURCE.columnAmbiguous(name));
+ }
+ Iterable<SqlCall> allAggList =
validator.aggFinder.findAll(ImmutableList.of(root));
+ for (SqlCall agg : allAggList) {
+ if (clause == Clause.HAVING && containsIdentifier(agg, id)) {
+ return super.visit(id);
+ }
}
- return super.visit(id);
+
+ expr = stripAs(expr);
+ if (expr instanceof SqlIdentifier) {
+ SqlIdentifier sid = (SqlIdentifier) expr;
+ final SqlIdentifier fqId =
getScope().fullyQualify(sid).identifier;
+ expr = expandDynamicStar(sid, fqId);
+ }
+
+ return expr;
}
@Override
public @Nullable SqlNode visit(SqlLiteral literal) {
- if (havingExpr ||
!validator.config().conformance().isGroupByOrdinal()) {
+ if (clause != Clause.GROUP_BY ||
!validator.config().conformance().isGroupByOrdinal()) {
return super.visit(literal);
}
boolean isOrdinalLiteral = literal == root;
@@ -6878,6 +6929,32 @@ public class SqlValidatorImpl implements
SqlValidatorWithHints {
return super.visit(literal);
}
+
+ /**
+ * Returns whether a given node contains a {@link SqlIdentifier}.
+ *
+ * @param sqlNode a SqlNode
+ * @param target a SqlIdentifier
+ */
+ private boolean containsIdentifier(SqlNode sqlNode, SqlIdentifier
target) {
+ try {
+ SqlVisitor<Void> visitor =
+ new SqlBasicVisitor<Void>() {
+ @Override
+ public Void visit(SqlIdentifier identifier) {
+ if (identifier.equalsDeep(target,
Litmus.IGNORE)) {
+ throw new Util.FoundOne(target);
+ }
+ return super.visit(identifier);
+ }
+ };
+ sqlNode.accept(visitor);
+ return false;
+ } catch (Util.FoundOne e) {
+ Util.swallow(e, null);
+ return true;
+ }
+ }
}
/** Information about an identifier in a particular scope. */
@@ -7350,6 +7427,52 @@ public class SqlValidatorImpl implements
SqlValidatorWithHints {
GROUP_BY,
SELECT,
ORDER,
- CURSOR
+ CURSOR,
+ HAVING,
+ QUALIFY;
+
+ /**
+ * Determines if the extender should replace aliases with expanded
values. For example:
+ *
+ * <blockquote>
+ *
+ * <pre>{@code
+ * SELECT a + a as twoA
+ * GROUP BY twoA
+ * }</pre>
+ *
+ * </blockquote>
+ *
+ * <p>turns into
+ *
+ * <blockquote>
+ *
+ * <pre>{@code
+ * SELECT a + a as twoA
+ * GROUP BY a + a
+ * }</pre>
+ *
+ * </blockquote>
+ *
+ * <p>This is determined both by the clause and the config.
+ *
+ * @param config The configuration
+ * @return Whether we should replace the alias with its expanded value
+ */
+ boolean shouldReplaceAliases(Config config) {
+ switch (this) {
+ case GROUP_BY:
+ return config.conformance().isGroupByAlias();
+
+ case HAVING:
+ return config.conformance().isHavingAlias();
+
+ case QUALIFY:
+ return true;
+
+ default:
+ throw Util.unexpected(this);
+ }
+ }
}
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
index bc13de49927..9e6b25acb13 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
@@ -124,7 +124,11 @@ import static java.util.Objects.requireNonNull;
import static org.apache.calcite.linq4j.Nullness.castNonNull;
/**
- * Copied to fix calcite issues. FLINK modifications are at lines
+ * RelDecorrelator replaces all correlated expressions (corExp) in a
relational expression (RelNode)
+ * tree with non-correlated expressions that are produced from joining the
RelNode that produces the
+ * corExp with the RelNode that references it.
+ *
+ * <p>TODO:
*
* <ol>
* <li>Was changed within FLINK-29280, FLINK-28682, FLINK-35804: Line 218 ~
225, Line 273 ~ 288
@@ -1069,17 +1073,11 @@ public class RelDecorrelator implements
ReflectiveVisitor {
// can directly add positions into corDefOutputs since join
// does not change the output ordering from the inputs.
- RelNode valueGen =
- requireNonNull(
- createValueGenerator(corVarList, leftInputOutputCount,
corDefOutputs),
- "createValueGenerator(...) is null");
+ final RelNode valueGen =
+ createValueGenerator(corVarList, leftInputOutputCount,
corDefOutputs);
+ requireNonNull(valueGen, "valueGen");
- RelNode join =
- relBuilder
- .push(frame.r)
- .push(valueGen)
- .join(JoinRelType.INNER, relBuilder.literal(true),
ImmutableSet.of())
- .build();
+ RelNode join =
relBuilder.push(frame.r).push(valueGen).join(JoinRelType.INNER).build();
// Join or Filter does not change the old input ordering. All
// input fields from newLeftInput (i.e. the original input to the old
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
index e1111ef812f..40477e1e2db 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
@@ -98,6 +98,7 @@ import org.apache.calcite.rex.RexFieldCollation;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexOver;
import org.apache.calcite.rex.RexPatternFieldRef;
import org.apache.calcite.rex.RexRangeRef;
import org.apache.calcite.rex.RexShuttle;
@@ -225,6 +226,7 @@ import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
import static org.apache.calcite.linq4j.Nullness.castNonNull;
+import static org.apache.calcite.runtime.FlatLists.append;
import static org.apache.calcite.sql.SqlUtil.stripAs;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -781,6 +783,8 @@ public class SqlToRelConverter {
convertSelectList(bb, select, orderExprList);
}
+ convertQualify(bb, select.getQualify());
+
if (select.isDistinct()) {
distinctify(bb, true);
}
@@ -895,8 +899,7 @@ public class SqlToRelConverter {
}
assert rel != null : "rel must not be null, root = " + bb.root;
- // Usual case: all of the expressions in the SELECT clause are
- // different.
+ // Usual case: all expressions in the SELECT clause are different.
final ImmutableBitSet groupSet =
ImmutableBitSet.range(rel.getRowType().getFieldCount());
rel = createAggregate(bb, groupSet, ImmutableList.of(groupSet),
ImmutableList.of());
@@ -3369,8 +3372,8 @@ public class SqlToRelConverter {
condition,
convertJoinType(join.getJoinType()));
relBuilder.push(joinRel);
- final RelNode newProjectRel =
relBuilder.project(relBuilder.fields()).build();
- bb.setRoot(newProjectRel, false);
+ relBuilder.project(relBuilder.fields());
+ bb.setRoot(relBuilder.build(), false);
}
private RexNode convertNaturalCondition(
@@ -3388,8 +3391,8 @@ public class SqlToRelConverter {
SqlValidatorNamespace leftNamespace,
SqlValidatorNamespace rightNamespace) {
final SqlNodeList list =
- (SqlNodeList)
- requireNonNull(join.getCondition(), () ->
"getCondition for join " + join);
+ requireNonNull(
+ (SqlNodeList) join.getCondition(), () -> "getCondition
for join " + join);
return convertUsing(
leftNamespace,
rightNamespace,
@@ -4317,10 +4320,9 @@ public class SqlToRelConverter {
private RelNode convertDelete(SqlDelete call) {
RelOptTable targetTable = getTargetTable(call);
- RelNode sourceRel =
- convertSelect(
- requireNonNull(call.getSourceSelect(), () ->
"sourceSelect for " + call),
- false);
+ final SqlSelect sourceSelect =
+ requireNonNull(call.getSourceSelect(), () -> "sourceSelect for
" + call);
+ RelNode sourceRel = convertSelect(sourceSelect, false);
return LogicalTableModify.create(
targetTable,
catalogReader,
@@ -4332,11 +4334,9 @@ public class SqlToRelConverter {
}
private RelNode convertUpdate(SqlUpdate call) {
- final SqlValidatorScope scope =
- validator()
- .getWhereScope(
- requireNonNull(
- call.getSourceSelect(), () ->
"sourceSelect for " + call));
+ final SqlSelect sourceSelect =
+ requireNonNull(call.getSourceSelect(), () -> "sourceSelect for
" + call);
+ final SqlValidatorScope scope =
validator().getWhereScope(sourceSelect);
Blackboard bb = createBlackboard(scope, null, false);
replaceSubQueries(bb, call, RelOptUtil.Logic.TRUE_FALSE_UNKNOWN);
@@ -4351,14 +4351,11 @@ public class SqlToRelConverter {
RelDataTypeField field =
SqlValidatorUtil.getTargetField(
targetRowType, typeFactory, id, catalogReader,
targetTable);
- assert field != null : "column " + id.toString() + " not found";
+ assert field != null : "column " + id + " not found";
targetColumnNameList.add(field.getName());
}
- RelNode sourceRel =
- convertSelect(
- requireNonNull(call.getSourceSelect(), () ->
"sourceSelect for " + call),
- false);
+ RelNode sourceRel = convertSelect(sourceSelect, false);
bb.setRoot(sourceRel, false);
ImmutableList.Builder<RexNode> rexNodeSourceExpressionListBuilder =
ImmutableList.builder();
@@ -4404,10 +4401,9 @@ public class SqlToRelConverter {
// first, convert the merge's source select to construct the columns
// from the target table and the set expressions in the update call
- RelNode mergeSourceRel =
- convertSelect(
- requireNonNull(call.getSourceSelect(), () ->
"sourceSelect for " + call),
- false);
+ final SqlSelect sourceSelect =
+ requireNonNull(call.getSourceSelect(), () -> "sourceSelect for
" + call);
+ RelNode mergeSourceRel = convertSelect(sourceSelect, false);
// then, convert the insert statement so we can get the insert
// values expressions
@@ -4684,6 +4680,10 @@ public class SqlToRelConverter {
selectList = validator().expandStar(selectList, select, false);
replaceSubQueries(bb, selectList, RelOptUtil.Logic.TRUE_FALSE_UNKNOWN);
+ replaceSubQueries(
+ bb,
+ new SqlNodeList(orderList, SqlParserPos.ZERO),
+ RelOptUtil.Logic.TRUE_FALSE_UNKNOWN);
List<String> fieldNames = new ArrayList<>();
final List<RexNode> exprs = new ArrayList<>();
@@ -4780,6 +4780,118 @@ public class SqlToRelConverter {
return alias;
}
+ private void convertQualify(Blackboard bb, @Nullable SqlNode qualify) {
+ if (qualify == null) {
+ return;
+ }
+
+ final LogicalProject projectionFromSelect =
+ requireNonNull((LogicalProject) bb.root, "root");
+
+ // Convert qualify SqlNode to a RexNode
+ replaceSubQueries(bb, qualify, RelOptUtil.Logic.UNKNOWN_AS_FALSE);
+ final RelNode originalRoot = requireNonNull(bb.root, "root");
+ RexNode qualifyRexNode;
+ try {
+ // Set the root to the input of the project,
+ // since QUALIFY might have an expression in the OVER clause
+ // that references a column not in the SELECT.
+ bb.setRoot(projectionFromSelect.getInput(), false);
+ qualifyRexNode = bb.convertExpression(qualify);
+ } finally {
+ bb.setRoot(originalRoot, false);
+ }
+
+ // Check to see if the qualify expression has a referenced expression
and
+ // do some referencing accordingly
+ final RexNode qualifyWithReferencesRexNode =
+ qualifyRexNode.accept(new
DuplicateEliminator(projectionFromSelect.getProjects()));
+
+ // Create a Project with the QUALIFY expression
+ if (qualifyWithReferencesRexNode.equals(qualifyRexNode)) {
+ // The QUALIFY expression does not depend on any references like
so:
+ //
+ // SELECT A, B
+ // FROM tbl
+ // QUALIFY WINDOW(C) = 1
+ //
+ // Meaning we should generate a plan like:
+ // Project(A, B, WINDOW(C) = 1 as QualifyExpression)
+ // TableScan(tbl)
+ //
+ relBuilder
+ .push(projectionFromSelect.getInput())
+ .project(
+ append(projectionFromSelect.getProjects(),
qualifyRexNode),
+ append(
+
projectionFromSelect.getRowType().getFieldNames(),
+ "QualifyExpression"));
+ } else {
+ // The QUALIFY expression depended on a reference meaning
+ // we need to introduce an extra project like so:
+ //
+ // SELECT A, B, WINDOW(C) as window_val
+ // FROM tbl
+ // QUALIFY window_val = 1
+ //
+ // Meaning we should generate a plan like:
+ //
+ // Project($0, $1, $2, =($2, 1) as QualifyExpression)
+ // Project(A, B, WINDOW(C) as window_val)
+ // TableScan(tbl)
+ //
+ // This is a very specific application of Common Subexpression
Elimination
+ // (CSE), since the window value pops up twice.
+ relBuilder
+ .push(requireNonNull(bb.root, "root"))
+ .project(
+ append(relBuilder.fields(),
qualifyWithReferencesRexNode),
+ append(
+
relBuilder.peek().getRowType().getFieldNames(),
+ "QualifyExpression"));
+ }
+
+ // Filter on that extra column
+ relBuilder.filter(Util.last(relBuilder.fields()));
+
+ // Remove that extra column from the projection
+ relBuilder.project(
+ Util.first(relBuilder.fields(),
projectionFromSelect.getProjects().size()));
+
+ // Update the root
+ bb.setRoot(relBuilder.build(), false);
+ }
+
+ /**
+ * Eliminates a common sub-expression by looking for a {@link RexNode} in
the expressions of a
+ * {@link Project}; if found, returns a refIndex instead of the raw node.
+ */
+ private static final class DuplicateEliminator extends RexShuttle {
+ private final List<RexNode> projects;
+
+ DuplicateEliminator(List<RexNode> projects) {
+ this.projects = projects;
+ }
+
+ @Override
+ public RexNode visitCall(RexCall call) {
+ final int i = projects.indexOf(call);
+ if (i >= 0) {
+ return new RexInputRef(i, projects.get(i).getType());
+ }
+ return super.visitCall(call);
+ }
+
+ @Override
+ public RexNode visitOver(RexOver over) {
+ final int i = projects.indexOf(over);
+ if (i >= 0) {
+ return new RexInputRef(i, projects.get(i).getType());
+ }
+ return over;
+ }
+ }
+
/** Converts a WITH sub-query into a relational expression. */
public RelRoot convertWith(SqlWith with, boolean top) {
return convertQuery(with.body, false, top);
@@ -6740,15 +6852,23 @@ public class SqlToRelConverter {
Config withExplain(boolean explain);
/**
- * Returns the {@code expand} option. Controls whether to expand
sub-queries. If false, each
- * sub-query becomes a {@link org.apache.calcite.rex.RexSubQuery}.
+ * Returns the {@code expand} option. Controls whether to expand
sub-queries. If false (the
+ * default), each sub-query becomes a {@link
org.apache.calcite.rex.RexSubQuery}.
+ *
+ * <p>Setting {@code expand} to true is deprecated. Expansion still
works, but there will be
+ * less development effort in that area.
*/
@Value.Default
default boolean isExpand() {
- return true;
+ return false;
}
- /** Sets {@link #isExpand()}. */
+ /**
+ * Sets {@link #isExpand()}.
+ *
+ * <p>Expansion is deprecated. We recommend that you do not call this
method, and use the
+ * default value of {@link #isExpand()}, false.
+ */
Config withExpand(boolean expand);
/**
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/StandardConvertletTable.java
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/StandardConvertletTable.java
index 71580fb2ea9..ce40f6bbeab 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/StandardConvertletTable.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/StandardConvertletTable.java
@@ -93,6 +93,7 @@ import java.util.List;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.Predicate;
+import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
@@ -105,7 +106,7 @@ import static org.apache.calcite.util.Util.first;
* <p>FLINK modifications are at lines
*
* <ol>
- * <li>Added in Flink-35216: Lines 712 ~ 757
+ * <li>Added in Flink-35216: Lines 731 ~ 776
* </ol>
*/
public class StandardConvertletTable extends ReflectiveConvertletTable {
@@ -154,6 +155,19 @@ public class StandardConvertletTable extends
ReflectiveConvertletTable {
}
});
+ // DATE(string) is equivalent to CAST(string AS DATE),
+ // but other DATE variants are treated as regular functions.
+ registerOp(
+ SqlLibraryOperators.DATE,
+ (cx, call) -> {
+ final RexCall e = (RexCall)
StandardConvertletTable.this.convertCall(cx, call);
+ if (e.getOperands().size() == 1
+ &&
SqlTypeUtil.isString(e.getOperands().get(0).getType())) {
+ return cx.getRexBuilder().makeCast(e.type,
e.getOperands().get(0));
+ }
+ return e;
+ });
+
registerOp(SqlLibraryOperators.LTRIM, new
TrimConvertlet(SqlTrimFunction.Flag.LEADING));
registerOp(SqlLibraryOperators.RTRIM, new
TrimConvertlet(SqlTrimFunction.Flag.TRAILING));
@@ -166,7 +180,12 @@ public class StandardConvertletTable extends
ReflectiveConvertletTable {
registerOp(
SqlLibraryOperators.SUBSTR_POSTGRESQL, new
SubstrConvertlet(SqlLibrary.POSTGRESQL));
+ registerOp(SqlLibraryOperators.DATE_ADD, new TimestampAddConvertlet());
+ registerOp(SqlLibraryOperators.DATE_DIFF, new
TimestampDiffConvertlet());
registerOp(SqlLibraryOperators.DATE_SUB, new TimestampSubConvertlet());
+ registerOp(SqlLibraryOperators.DATETIME_ADD, new
TimestampAddConvertlet());
+ registerOp(SqlLibraryOperators.DATETIME_DIFF, new
TimestampDiffConvertlet());
+ registerOp(SqlLibraryOperators.DATETIME_SUB, new
TimestampSubConvertlet());
registerOp(SqlLibraryOperators.TIME_ADD, new TimestampAddConvertlet());
registerOp(SqlLibraryOperators.TIME_DIFF, new
TimestampDiffConvertlet());
registerOp(SqlLibraryOperators.TIME_SUB, new TimestampSubConvertlet());
@@ -1377,15 +1396,11 @@ public class StandardConvertletTable extends
ReflectiveConvertletTable {
private static SqlNode getCastedSqlNode(
SqlNode argInput, RelDataType varType, SqlParserPos pos,
@Nullable RexNode argRex) {
- SqlNode arg;
- if (argRex != null && !argRex.getType().equals(varType)) {
- arg =
- SqlStdOperatorTable.CAST.createCall(
- pos, argInput,
SqlTypeUtil.convertTypeToSpec(varType));
- } else {
- arg = argInput;
+ if (argRex == null || argRex.getType().equals(varType)) {
+ return argInput;
}
- return arg;
+ return SqlStdOperatorTable.CAST.createCall(
+ pos, argInput, SqlTypeUtil.convertTypeToSpec(varType));
}
}
@@ -1523,15 +1538,11 @@ public class StandardConvertletTable extends
ReflectiveConvertletTable {
private static SqlNode getCastedSqlNode(
SqlNode argInput, RelDataType varType, SqlParserPos pos,
@Nullable RexNode argRex) {
- SqlNode arg;
- if (argRex != null && !argRex.getType().equals(varType)) {
- arg =
- SqlStdOperatorTable.CAST.createCall(
- pos, argInput,
SqlTypeUtil.convertTypeToSpec(varType));
- } else {
- arg = argInput;
+ if (argRex == null || argRex.getType().equals(varType)) {
+ return argInput;
}
- return arg;
+ return SqlStdOperatorTable.CAST.createCall(
+ pos, argInput, SqlTypeUtil.convertTypeToSpec(varType));
}
}
@@ -1797,6 +1808,9 @@ public class StandardConvertletTable extends
ReflectiveConvertletTable {
public RexNode convertCall(SqlRexContext cx, SqlCall call) {
// TIMESTAMPADD(unit, count, timestamp)
// => timestamp + count * INTERVAL '1' UNIT
+ // TIMESTAMP_ADD(timestamp, interval)
+ // => timestamp + interval
+ // "timestamp" may be of type TIMESTAMP or TIMESTAMP WITH LOCAL
TIME ZONE.
final RexBuilder rexBuilder = cx.getRexBuilder();
SqlIntervalQualifier qualifier;
final RexNode op1;
@@ -1911,34 +1925,79 @@ public class StandardConvertletTable extends
ReflectiveConvertletTable {
private static class TimestampDiffConvertlet implements SqlRexConvertlet {
@Override
public RexNode convertCall(SqlRexContext cx, SqlCall call) {
+ // The standard TIMESTAMPDIFF and BigQuery's TIMESTAMP_DIFF have
two key
+ // differences. The first being the order of the subtraction,
outlined
+ // below. The second is that BigQuery truncates each timestamp to
the
+ // specified time unit before the difference is computed.
+ //
+ // In fact, all BigQuery functions (TIMESTAMP_DIFF, DATETIME_DIFF,
+ // DATE_DIFF) truncate before subtracting when applied to date
intervals
+ // (DAY, WEEK, ISOWEEK, MONTH, YEAR, etc.)
+ //
+ // For example, if computing the number of weeks between two
timestamps,
+ // one occurring on a Saturday and the other occurring the next
day on
+ // Sunday, their week difference is 1. This is because the first
timestamp
+ // is truncated to the previous Sunday. This is done by making
calls to
+ // TIMESTAMP_TRUNC and the difference is then computed using their
+ // results.
+ //
// TIMESTAMPDIFF(unit, t1, t2)
// => (t2 - t1) UNIT
// TIMESTAMP_DIFF(t1, t2, unit)
// => (t1 - t2) UNIT
SqlIntervalQualifier qualifier;
+ final boolean preTruncate;
final RexNode op1;
final RexNode op2;
if (call.operand(0).getKind() == SqlKind.INTERVAL_QUALIFIER) {
qualifier = call.operand(0);
+ preTruncate = false;
op1 = cx.convertExpression(call.operand(1));
op2 = cx.convertExpression(call.operand(2));
} else {
qualifier = call.operand(2);
+ preTruncate = qualifier.isDate();
op1 = cx.convertExpression(call.operand(1));
op2 = cx.convertExpression(call.operand(0));
}
final RexBuilder rexBuilder = cx.getRexBuilder();
+ final RelDataTypeFactory typeFactory = cx.getTypeFactory();
final TimeFrame timeFrame =
cx.getValidator().validateTimeFrame(qualifier);
final TimeUnit unit = first(timeFrame.unit(), TimeUnit.EPOCH);
+ UnaryOperator<RexNode> truncateFn = UnaryOperator.identity();
if (unit == TimeUnit.EPOCH && qualifier.timeFrameName != null) {
// Custom time frames have a different path. They are kept as
names, and
// then handled by Java functions.
final RexLiteral timeFrameName =
rexBuilder.makeLiteral(qualifier.timeFrameName);
+ // This additional logic accounts for BigQuery truncating
prior to
+ // computing the difference.
+ if (preTruncate) {
+ truncateFn =
+ e ->
+ rexBuilder.makeCall(
+ e.getType(),
+
SqlLibraryOperators.TIMESTAMP_TRUNC,
+ ImmutableList.of(e,
timeFrameName));
+ }
return rexBuilder.makeCall(
cx.getValidator().getValidatedNodeType(call),
SqlStdOperatorTable.TIMESTAMP_DIFF,
- ImmutableList.of(timeFrameName, op1, op2));
+ ImmutableList.of(
+ timeFrameName, truncateFn.apply(op1),
truncateFn.apply(op2)));
+ }
+
+ if (preTruncate) {
+ // The timestamps should be truncated unless the time unit is
HOUR, in
+ // which case only the whole number of hours between the
timestamps
+ // should be returned.
+ final RexNode timeUnit = cx.convertExpression(qualifier);
+ truncateFn =
+ e ->
+ rexBuilder.makeCall(
+ e.getType(),
+ SqlLibraryOperators.TIMESTAMP_TRUNC,
+ ImmutableList.of(e, timeUnit));
}
BigDecimal multiplier = BigDecimal.ONE;
@@ -1968,23 +2027,21 @@ public class StandardConvertletTable extends
ReflectiveConvertletTable {
qualifier = new SqlIntervalQualifier(unit, null,
qualifier.getParserPosition());
break;
}
+
final RelDataType intervalType =
- cx.getTypeFactory()
- .createTypeWithNullability(
-
cx.getTypeFactory().createSqlIntervalType(qualifier),
- op1.getType().isNullable() ||
op2.getType().isNullable());
- final RexCall rexCall =
- (RexCall)
- rexBuilder.makeCall(
- intervalType,
- SqlStdOperatorTable.MINUS_DATE,
- ImmutableList.of(op2, op1));
+ typeFactory.createTypeWithNullability(
+ typeFactory.createSqlIntervalType(qualifier),
+ op1.getType().isNullable() ||
op2.getType().isNullable());
+ final RexNode call2 =
+ rexBuilder.makeCall(
+ intervalType,
+ SqlStdOperatorTable.MINUS_DATE,
+ ImmutableList.of(truncateFn.apply(op2),
truncateFn.apply(op1)));
final RelDataType intType =
- cx.getTypeFactory()
- .createTypeWithNullability(
-
cx.getTypeFactory().createSqlType(sqlTypeName),
-
SqlTypeUtil.containsNullable(rexCall.getType()));
- RexNode e = rexBuilder.makeCast(intType, rexCall);
+ typeFactory.createTypeWithNullability(
+ typeFactory.createSqlType(sqlTypeName),
+ SqlTypeUtil.containsNullable(call2.getType()));
+ RexNode e = rexBuilder.makeCast(intType, call2);
return rexBuilder.multiplyDivide(e, multiplier, divider);
}
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/tools/RelBuilder.java
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/tools/RelBuilder.java
index 0c21d0af78c..22beba442a8 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/tools/RelBuilder.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/tools/RelBuilder.java
@@ -2288,7 +2288,7 @@ public class RelBuilder {
@Nullable Iterable<? extends @Nullable String> fieldNames,
boolean force,
Iterable<CorrelationId> variablesSet) {
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "rawtypes"})
final List<? extends RexNode> nodeList =
nodes instanceof List ? (List) nodes :
ImmutableList.copyOf(nodes);
final List<@Nullable String> fieldNameList =
diff --git a/flink-table/flink-table-planner/src/main/resources/META-INF/NOTICE
b/flink-table/flink-table-planner/src/main/resources/META-INF/NOTICE
index 2918375f6cd..6715ff98c4b 100644
--- a/flink-table/flink-table-planner/src/main/resources/META-INF/NOTICE
+++ b/flink-table/flink-table-planner/src/main/resources/META-INF/NOTICE
@@ -8,8 +8,8 @@ This project bundles the following dependencies under the
Apache Software Licens
- com.google.guava:guava:32.1.3-jre
- com.google.guava:failureaccess:1.0.1
-- org.apache.calcite:calcite-core:1.33.0
-- org.apache.calcite:calcite-linq4j:1.33.0
+- org.apache.calcite:calcite-core:1.34.0
+- org.apache.calcite:calcite-linq4j:1.34.0
- org.apache.calcite.avatica:avatica-core:1.23.0
- org.apache.commons:commons-math3:3.6.1
- commons-codec:commons-codec:1.15
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala
index 087aa6ab438..a02236f687c 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala
@@ -532,6 +532,48 @@ class OverAggregateITCase extends BatchTestBase {
)
}
+ @Test
+ def testWindowAggregationSumWithQualify(): Unit = {
+ checkResult(
+ "SELECT d, e FROM Table5 QUALIFY sum(e) OVER (PARTITION BY d ORDER BY e)
> 20",
+ Seq(
+ row(4, 9),
+ row(4, 10),
+ row(5, 12),
+ row(5, 13),
+ row(5, 14),
+ row(5, 15)
+ )
+ )
+ }
+
+ @Test
+ def testWindowAggregationRowNumberWithQualify(): Unit = {
+ checkResult(
+ "SELECT d, e, row_number() OVER (PARTITION BY d ORDER BY e) AS rownum
FROM Table5 " +
+ "QUALIFY rownum = 1",
+ Seq(
+ row(1, 1, 1),
+ row(2, 2, 1),
+ row(3, 4, 1),
+ row(4, 7, 1),
+ row(5, 11, 1)
+ )
+ )
+ }
+
+ @Test
+ def testWindowAggregationCountWithQualify(): Unit = {
+ checkResult(
+ "SELECT d, e FROM Table5 QUALIFY count(*) OVER (PARTITION BY d ORDER BY
e) = 3",
+ Seq(
+ row(3, 6),
+ row(4, 9),
+ row(5, 13)
+ )
+ )
+ }
+
@Test
def testWindowAggregationCountWithOrderBy(): Unit = {
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/RankITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/RankITCase.scala
index 10662856840..677d3602ebf 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/RankITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/RankITCase.scala
@@ -232,4 +232,23 @@ class RankITCase extends BatchTestBase {
)
}
+ @Test
+ def testRankFunctionsWithQualify(): Unit = {
+ checkResult(
+ "SELECT a, b, c FROM Table3 " +
+ "QUALIFY RANK() OVER (PARTITION BY b ORDER BY a) > 2 ",
+ Seq(
+ row(6, 3, "Luke Skywalker"),
+ row(9, 4, "Comment#3"),
+ row(10, 4, "Comment#4"),
+ row(13, 5, "Comment#7"),
+ row(14, 5, "Comment#8"),
+ row(15, 5, "Comment#9"),
+ row(18, 6, "Comment#12"),
+ row(19, 6, "Comment#13"),
+ row(20, 6, "Comment#14"),
+ row(21, 6, "Comment#15")
+ )
+ )
+ }
}
diff --git a/flink-table/pom.xml b/flink-table/pom.xml
index 4329be9be75..4163b2a8d5f 100644
--- a/flink-table/pom.xml
+++ b/flink-table/pom.xml
@@ -78,8 +78,8 @@ under the License.
</dependencyManagement>
<properties>
- <calcite.version>1.33.0</calcite.version>
- <!-- Calcite 1.33.0 depends on 3.1.8,
+ <calcite.version>1.34.0</calcite.version>
+ <!-- Calcite 1.34.0 depends on 3.1.8,
at the same time minimum 3.1.x Janino version passing Flink
tests without WAs is 3.1.10,
more details are in FLINK-27995 -->
<janino.version>3.1.10</janino.version>