This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ccfd8be19ba  [FLINK-31836][table] Upgrade Calcite version to 1.34.0
ccfd8be19ba is described below

commit ccfd8be19ba1d38e5cd6fb5c7e18dc07019375b5
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Mon Nov 25 12:53:56 2024 +0100

     [FLINK-31836][table] Upgrade Calcite version to 1.34.0
---
 .../docs/dev/table/sql/queries/deduplication.md    |   8 +-
 docs/content.zh/docs/dev/table/sql/queries/topn.md |  12 +-
 .../docs/dev/table/concepts/versioned_tables.md    |   8 +-
 .../docs/dev/table/sql/queries/deduplication.md    |   8 +-
 docs/content/docs/dev/table/sql/queries/topn.md    |  12 +-
 flink-table/flink-sql-parser/pom.xml               |   4 +-
 .../src/main/codegen/data/Parser.tdd               |   2 +
 .../src/main/codegen/templates/Parser.jj           | 148 +++++++++++-
 flink-table/flink-table-calcite-bridge/pom.xml     |   8 +-
 .../calcite/sql/validate/SqlValidatorImpl.java     | 253 +++++++++++++++------
 .../apache/calcite/sql2rel/RelDecorrelator.java    |  20 +-
 .../apache/calcite/sql2rel/SqlToRelConverter.java  | 176 +++++++++++---
 .../calcite/sql2rel/StandardConvertletTable.java   | 123 +++++++---
 .../java/org/apache/calcite/tools/RelBuilder.java  |   2 +-
 .../src/main/resources/META-INF/NOTICE             |   4 +-
 .../runtime/batch/sql/OverAggregateITCase.scala    |  42 ++++
 .../planner/runtime/batch/sql/RankITCase.scala     |  19 ++
 flink-table/pom.xml                                |   4 +-
 18 files changed, 663 insertions(+), 190 deletions(-)

diff --git a/docs/content.zh/docs/dev/table/sql/queries/deduplication.md 
b/docs/content.zh/docs/dev/table/sql/queries/deduplication.md
index b81c49213c4..3368f0ced5f 100644
--- a/docs/content.zh/docs/dev/table/sql/queries/deduplication.md
+++ b/docs/content.zh/docs/dev/table/sql/queries/deduplication.md
@@ -33,12 +33,8 @@ Flink 使用 `ROW_NUMBER()` 去除重复数据,就像 Top-N 查询一样。其
 
 ```sql
 SELECT [column_list]
-FROM (
-   SELECT [column_list],
-     ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
-       ORDER BY time_attr [asc|desc]) AS rownum
-   FROM table_name)
-WHERE rownum = 1
+FROM table_name
+QUALIFY ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] ORDER BY time_attr 
[asc|desc]) = 1
 ```
 
 **参数说明:**
diff --git a/docs/content.zh/docs/dev/table/sql/queries/topn.md 
b/docs/content.zh/docs/dev/table/sql/queries/topn.md
index 7339ca3b590..131895597c9 100644
--- a/docs/content.zh/docs/dev/table/sql/queries/topn.md
+++ b/docs/content.zh/docs/dev/table/sql/queries/topn.md
@@ -32,13 +32,11 @@ Flink 使用 `OVER` 窗口子句和过滤条件的组合来表达一个 Top-N 
 下面展示了 Top-N 的语法:
 
 ```sql
-SELECT [column_list]
-FROM (
-   SELECT [column_list],
-     ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
-       ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
-   FROM table_name)
-WHERE rownum <= N [AND conditions]
+SELECT [column_list],
+    ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] ORDER BY time_attr 
[asc|desc]) AS rownum
+FROM table_name
+QUALIFY rownum <= N
+[WHERE conditions]
 ```
 
 **参数说明:**
diff --git a/docs/content/docs/dev/table/concepts/versioned_tables.md 
b/docs/content/docs/dev/table/concepts/versioned_tables.md
index 4cf494898c3..d42bb110eca 100644
--- a/docs/content/docs/dev/table/concepts/versioned_tables.md
+++ b/docs/content/docs/dev/table/concepts/versioned_tables.md
@@ -164,12 +164,8 @@ In general, the results of a query with the following 
format produces a versione
 
 ```sql
 SELECT [column_list]
-FROM (
-   SELECT [column_list],
-     ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
-       ORDER BY time_attr DESC) AS rownum
-   FROM table_name)
-WHERE rownum = 1
+FROM table_name
+QUALIFY ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] ORDER BY time_attr 
[asc|desc]) = 1
 ```
 
 **Parameter Specification:**
diff --git a/docs/content/docs/dev/table/sql/queries/deduplication.md 
b/docs/content/docs/dev/table/sql/queries/deduplication.md
index 18b517e6e56..08a02c1c8ba 100644
--- a/docs/content/docs/dev/table/sql/queries/deduplication.md
+++ b/docs/content/docs/dev/table/sql/queries/deduplication.md
@@ -33,12 +33,8 @@ The following shows the syntax of the Deduplication 
statement:
 
 ```sql
 SELECT [column_list]
-FROM (
-   SELECT [column_list],
-     ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
-       ORDER BY time_attr [asc|desc]) AS rownum
-   FROM table_name)
-WHERE rownum = 1
+FROM table_name
+QUALIFY ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] ORDER BY time_attr 
[asc|desc]) = 1
 ```
 
 **Parameter Specification:**
diff --git a/docs/content/docs/dev/table/sql/queries/topn.md 
b/docs/content/docs/dev/table/sql/queries/topn.md
index d4652b7a1f7..e374c3cca4e 100644
--- a/docs/content/docs/dev/table/sql/queries/topn.md
+++ b/docs/content/docs/dev/table/sql/queries/topn.md
@@ -33,13 +33,11 @@ Flink uses the combination of a OVER window clause and a 
filter condition to exp
 The following shows the syntax of the Top-N statement:
 
 ```sql
-SELECT [column_list]
-FROM (
-   SELECT [column_list],
-     ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
-       ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
-   FROM table_name)
-WHERE rownum <= N [AND conditions]
+SELECT [column_list],
+    ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] ORDER BY time_attr 
[asc|desc]) AS rownum
+FROM table_name
+QUALIFY rownum <= N
+[WHERE conditions]
 ```
 
 **Parameter Specification:**
diff --git a/flink-table/flink-sql-parser/pom.xml 
b/flink-table/flink-sql-parser/pom.xml
index 4e7c463db78..866fa4f97e3 100644
--- a/flink-table/flink-sql-parser/pom.xml
+++ b/flink-table/flink-sql-parser/pom.xml
@@ -62,9 +62,9 @@ under the License.
                        <version>${calcite.version}</version>
                        <exclusions>
                                <!--
-                               "mvn dependency:tree" as of Calcite 1.33.0:
+                               "mvn dependency:tree" as of Calcite 1.34.0:
 
-                               [INFO] +- 
org.apache.calcite:calcite-core:jar:1.33.0:compile
+                               [INFO] +- 
org.apache.calcite:calcite-core:jar:1.34.0:compile
                                [INFO] |  +- 
org.apache.calcite.avatica:avatica-core:jar:1.23.0:compile
                                [INFO] |  +- 
org.apiguardian:apiguardian-api:jar:1.1.2:compile
                                [INFO] |  +- 
org.checkerframework:checker-qual:jar:3.12.0:compile
diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd 
b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index 2a366116e93..ff10d47109a 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -287,6 +287,8 @@
     "CURSOR_NAME"
     "DATA"
     "DATABASE"
+    "DATE_DIFF"
+    "DATETIME_DIFF"
     "DATETIME_INTERVAL_CODE"
     "DATETIME_INTERVAL_PRECISION"
     "DAYS"
diff --git a/flink-table/flink-sql-parser/src/main/codegen/templates/Parser.jj 
b/flink-table/flink-sql-parser/src/main/codegen/templates/Parser.jj
index 57c5eec2373..9de00ec3dcb 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/templates/Parser.jj
+++ b/flink-table/flink-sql-parser/src/main/codegen/templates/Parser.jj
@@ -1311,6 +1311,7 @@ SqlSelect SqlSelect() :
     final SqlNodeList groupBy;
     final SqlNode having;
     final SqlNodeList windowDecls;
+    final SqlNode qualify;
     final List<SqlNode> hints = new ArrayList<SqlNode>();
     final Span s;
 }
@@ -1337,6 +1338,7 @@ SqlSelect SqlSelect() :
         ( groupBy = GroupBy() | { groupBy = null; } )
         ( having = Having() | { having = null; } )
         ( windowDecls = Window() | { windowDecls = null; } )
+        ( qualify = Qualify() | { qualify = null; } )
     |
         E() {
             fromClause = null;
@@ -1344,13 +1346,14 @@ SqlSelect SqlSelect() :
             groupBy = null;
             having = null;
             windowDecls = null;
+            qualify = null;
         }
     )
     {
         return new SqlSelect(s.end(this), keywordList,
             new SqlNodeList(selectList, Span.of(selectList).pos()),
-            fromClause, where, groupBy, having, windowDecls, null, null, null,
-            new SqlNodeList(hints, getPos()));
+            fromClause, where, groupBy, having, windowDecls, qualify,
+            null, null, null, new SqlNodeList(hints, getPos()));
     }
 }
 
@@ -2773,6 +2776,15 @@ SqlNode WindowRange() :
     )
 }
 
+/** Parses a QUALIFY clause for SELECT. */
+SqlNode Qualify() :
+{
+    SqlNode e;
+}
+{
+    <QUALIFY> e = Expression(ExprContext.ACCEPT_SUB_QUERY) { return e; }
+}
+
 /**
  * Parses an ORDER BY clause.
  */
@@ -4033,6 +4045,7 @@ SqlNode AtomicRowExpression() :
 }
 {
     (
+        LOOKAHEAD(2)
         e = LiteralOrIntervalExpression()
     |
         e = DynamicParam()
@@ -4629,6 +4642,28 @@ SqlLiteral DateTimeLiteral() :
     }
 }
 
+/** Parses a Date/Time constructor function, for example "DATE(1969, 7, 21)"
+ * or "DATETIME(d, t)". Enabled in some libraries (e.g. BigQuery). */
+SqlNode DateTimeConstructorCall() :
+{
+    final SqlFunctionCategory funcType = SqlFunctionCategory.TIMEDATE;
+    final SqlIdentifier qualifiedName;
+    final Span s;
+    final SqlLiteral quantifier;
+    final List<? extends SqlNode> args;
+}
+{
+    (<DATE> | <TIME> | <DATETIME> | <TIMESTAMP>) {
+        s = span();
+        qualifiedName = new SqlIdentifier(unquotedIdentifier(), getPos());
+    }
+    args = FunctionParameterList(ExprContext.ACCEPT_SUB_QUERY) {
+        quantifier = (SqlLiteral) args.get(0);
+        args.remove(0);
+        return createCall(qualifiedName, s.end(this), funcType, quantifier, 
args);
+    }
+}
+
 /** Parses a MULTISET constructor */
 SqlNode MultisetConstructor() :
 {
@@ -5047,13 +5082,42 @@ SqlIntervalQualifier IntervalQualifierStart() :
  * registered as abbreviations in your time frame set.
  */
 SqlIntervalQualifier TimeUnitOrName() : {
-    final Span span;
-    final String w;
-    final TimeUnit unit;
     final SqlIdentifier unitName;
+    final SqlIntervalQualifier intervalQualifier;
 }
 {
+    // When we see a time unit that is also a non-reserved keyword, such as
+    // NANOSECOND, there is a choice between using the TimeUnit enum
+    // (TimeUnit.NANOSECOND) or the name. The following LOOKAHEAD directive
+    // tells the parser that we prefer the former.
+    //
+    // Reserved keywords, such as SECOND, cannot be identifiers, and are
+    // therefore not ambiguous.
     LOOKAHEAD(2)
+    intervalQualifier = TimeUnit() {
+        return intervalQualifier;
+    }
+|   unitName = SimpleIdentifier() {
+        return new SqlIntervalQualifier(unitName.getSimple(),
+            unitName.getParserPosition());
+    }
+}
+
+/** Parses a built-in time unit (e.g. "YEAR")
+ * and returns a {@link SqlIntervalQualifier}.
+ *
+ * <p>Includes {@code WEEK} and {@code WEEK(SUNDAY)} through
+  {@code WEEK(SATURDAY)}.
+ *
+ * <p>Does not include SQL_TSI_DAY, SQL_TSI_FRAC_SECOND etc. These will be
+ * parsed as identifiers and can be resolved in the validator if they are
+ * registered as abbreviations in your time frame set.
+ */
+SqlIntervalQualifier TimeUnit() : {
+    final Span span;
+    final String w;
+}
+{
     <NANOSECOND> { return new SqlIntervalQualifier(TimeUnit.NANOSECOND, null, 
getPos()); }
 |   <MICROSECOND> { return new SqlIntervalQualifier(TimeUnit.MICROSECOND, 
null, getPos()); }
 |   <MILLISECOND> { return new SqlIntervalQualifier(TimeUnit.MILLISECOND, 
null, getPos()); }
@@ -5067,6 +5131,8 @@ SqlIntervalQualifier TimeUnitOrName() : {
 |   <ISOYEAR> { return new SqlIntervalQualifier(TimeUnit.ISOYEAR, null, 
getPos()); }
 |   <WEEK> { span = span(); }
     (
+        // There is a choice between "WEEK(weekday)" and "WEEK". We prefer
+        // the former, and the parser will look ahead for '('.
         LOOKAHEAD(2)
         <LPAREN> w = weekdayName() <RPAREN> {
             return new SqlIntervalQualifier(w, span.end(this));
@@ -5081,10 +5147,6 @@ SqlIntervalQualifier TimeUnitOrName() : {
 |   <DECADE> { return new SqlIntervalQualifier(TimeUnit.DECADE, null, 
getPos()); }
 |   <CENTURY> { return new SqlIntervalQualifier(TimeUnit.CENTURY, null, 
getPos()); }
 |   <MILLENNIUM> { return new SqlIntervalQualifier(TimeUnit.MILLENNIUM, null, 
getPos()); }
-|   unitName = SimpleIdentifier() {
-        return new SqlIntervalQualifier(unitName.getSimple(),
-            unitName.getParserPosition());
-    }
 }
 
 String weekdayName() :
@@ -6085,10 +6147,16 @@ SqlNode BuiltinFunctionCall() :
         <RPAREN> {
             return SqlStdOperatorTable.TRIM.createCall(s.end(this), args);
         }
+    |
+        node = DateTimeConstructorCall() { return node; }
+    |
+        node = DateDiffFunctionCall() { return node; }
     |
         node = DateTruncFunctionCall() { return node; }
     |
         node = TimestampAddFunctionCall() { return node; }
+    |
+        node = DatetimeDiffFunctionCall() { return node; }
     |
         node = TimestampDiffFunctionCall() { return node; }
     |
@@ -6603,6 +6671,28 @@ SqlCall JsonArrayAggFunctionCall() :
     }
 }
 
+/**
+ * Parses a call to BigQuery's DATE_DIFF.
+ */
+SqlCall DateDiffFunctionCall() :
+{
+    final List<SqlNode> args = new ArrayList<SqlNode>();
+    final Span s;
+    final SqlIntervalQualifier unit;
+}
+{
+    <DATE_DIFF> { s = span(); }
+    <LPAREN>
+    AddExpression(args, ExprContext.ACCEPT_SUB_QUERY)
+    <COMMA>
+    AddExpression(args, ExprContext.ACCEPT_SUB_QUERY)
+    <COMMA>
+    unit = TimeUnitOrName() { args.add(unit); }
+    <RPAREN> {
+        return SqlLibraryOperators.DATE_DIFF.createCall(s.end(this), args);
+    }
+}
+
 /**
  * Parses a call to TIMESTAMPADD.
  */
@@ -6676,6 +6766,28 @@ SqlCall TimestampDiff3FunctionCall() :
     }
 }
 
+/**
+ * Parses BigQuery's built-in DATETIME_DIFF() function.
+ */
+SqlCall DatetimeDiffFunctionCall() :
+{
+    final List<SqlNode> args = new ArrayList<SqlNode>();
+    final Span s;
+    final SqlIntervalQualifier unit;
+}
+{
+    <DATETIME_DIFF> { s = span(); }
+    <LPAREN>
+    AddExpression(args, ExprContext.ACCEPT_SUB_QUERY)
+    <COMMA>
+    AddExpression(args, ExprContext.ACCEPT_SUB_QUERY)
+    <COMMA>
+    unit = TimeUnitOrName() { args.add(unit); }
+    <RPAREN> {
+        return SqlLibraryOperators.DATETIME_DIFF.createCall(s.end(this), args);
+    }
+}
+
 /**
  * Parses a call to DATE_TRUNC.
  */
@@ -6694,7 +6806,8 @@ SqlCall DateTruncFunctionCall() :
     // the BigQuery variant, e.g. "DATE_TRUNC(d, YEAR)",
     // and the Redshift variant, e.g. "DATE_TRUNC('year', DATE '2008-09-08')".
     (
-        unit = TimeUnitOrName() { args.add(unit); }
+        LOOKAHEAD(2)
+        unit = TimeUnit() { args.add(unit); }
     |
         AddExpression(args, ExprContext.ACCEPT_SUB_QUERY)
     )
@@ -7252,6 +7365,18 @@ SqlNode JdbcFunctionCall() :
         s = span();
     }
     (
+        LOOKAHEAD(1)
+        call = DateDiffFunctionCall() {
+            name = call.getOperator().getName();
+            args = new SqlNodeList(call.getOperandList(), getPos());
+        }
+    |
+        LOOKAHEAD(1)
+        call = DatetimeDiffFunctionCall() {
+            name = call.getOperator().getName();
+            args = new SqlNodeList(call.getOperandList(), getPos());
+        }
+    |
         LOOKAHEAD(1)
         call = TimestampAddFunctionCall() {
             name = call.getOperator().getName();
@@ -7658,8 +7783,10 @@ SqlPostfixOperator PostfixRowOperator() :
 |   < DATE: "DATE" >
 |   < DATE_TRUNC: "DATE_TRUNC" >
 |   < DATETIME: "DATETIME" >
+|   < DATETIME_DIFF: "DATETIME_DIFF" >
 |   < DATETIME_INTERVAL_CODE: "DATETIME_INTERVAL_CODE" >
 |   < DATETIME_INTERVAL_PRECISION: "DATETIME_INTERVAL_PRECISION" >
+|   < DATE_DIFF: "DATE_DIFF" >
 |   < DAY: "DAY" >
 |   < DAYS: "DAYS" >
 |   < DEALLOCATE: "DEALLOCATE" >
@@ -7958,6 +8085,7 @@ SqlPostfixOperator PostfixRowOperator() :
 |   < PRIVILEGES: "PRIVILEGES" >
 |   < PROCEDURE: "PROCEDURE" >
 |   < PUBLIC: "PUBLIC" >
+|   < QUALIFY: "QUALIFY" >
 |   < QUARTER: "QUARTER" >
 |   < QUARTERS: "QUARTERS" >
 |   < RANGE: "RANGE" >
diff --git a/flink-table/flink-table-calcite-bridge/pom.xml 
b/flink-table/flink-table-calcite-bridge/pom.xml
index 3c57ecbaab1..9b081d688ed 100644
--- a/flink-table/flink-table-calcite-bridge/pom.xml
+++ b/flink-table/flink-table-calcite-bridge/pom.xml
@@ -45,9 +45,9 @@ under the License.
                        <version>${calcite.version}</version>
                        <exclusions>
                                <!--
-                               "mvn dependency:tree" as of Calcite 1.33.0:
-                               [INFO] +- 
org.apache.calcite:calcite-core:jar:1.33.0:compile
-                               [INFO] |  +- 
org.apache.calcite:calcite-linq4j:jar:1.33.0:compile
+                               "mvn dependency:tree" as of Calcite 1.34.0:
+                               [INFO] +- 
org.apache.calcite:calcite-core:jar:1.34.0:compile
+                               [INFO] |  +- 
org.apache.calcite:calcite-linq4j:jar:1.34.0:compile
                                [INFO] |  +- 
org.locationtech.jts:jts-core:jar:1.19.0:compile
                                [INFO] |  +- 
com.fasterxml.jackson.core:jackson-annotations:jar:2.15.3:compile
                                [INFO] |  +- 
org.apache.calcite.avatica:avatica-core:jar:1.23.0:compile
@@ -60,7 +60,7 @@ under the License.
                                [INFO] |  |        \- 
org.ow2.asm:asm:jar:9.1:runtime
                                [INFO] |  +- 
commons-codec:commons-codec:jar:1.15:runtime
                                [INFO] |  +- 
org.apache.commons:commons-math3:jar:3.6.1:runtime
-                               [INFO] |  \- 
commons-io:commons-io:jar:2.11.0:runtime
+                               [INFO] |  \- 
commons-io:commons-io:jar:2.15.1:runtime
 
                                Dependencies that are not needed for how we use 
Calcite right now.
                                -->
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
index 097a9f2316f..d878af1c7f7 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
@@ -499,9 +499,12 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
         inferUnknownTypes(targetType, selectScope, expanded);
 
         RelDataType type = deriveType(selectScope, expanded);
-        // Re-derive SELECT ITEM's data type that may be nullable in 
AggregatingSelectScope when it
-        // appears in advanced grouping elements such as CUBE, ROLLUP , 
GROUPING SETS.
-        // For example, SELECT CASE WHEN c = 1 THEN '1' ELSE '23' END AS x 
FROM t GROUP BY CUBE(x),
+        // Re-derive SELECT ITEM's data type that may be nullable in
+        // AggregatingSelectScope when it appears in advanced grouping 
elements such
+        // as CUBE, ROLLUP, GROUPING SETS. For example, in
+        //   SELECT CASE WHEN c = 1 THEN '1' ELSE '23' END AS x
+        //   FROM t
+        //   GROUP BY CUBE(x)
         // the 'x' should be nullable even if x's literal values are not null.
         if (selectScope instanceof AggregatingSelectScope) {
             type = requireNonNull(selectScope.nullifyType(stripAs(expanded), 
type));
@@ -1459,6 +1462,7 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
                             null,
                             null,
                             null,
+                            null,
                             orderList,
                             orderBy.offset,
                             orderBy.fetch,
@@ -1483,6 +1487,7 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
                             null,
                             null,
                             null,
+                            null,
                             null);
                 }
 
@@ -1590,6 +1595,7 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
                         null,
                         null,
                         null,
+                        null,
                         null);
         call.setSourceSelect(select);
 
@@ -1616,6 +1622,7 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
                             null,
                             null,
                             null,
+                            null,
                             null);
             insertCall.setSource(select);
         }
@@ -1678,6 +1685,7 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
                         null,
                         null,
                         null,
+                        null,
                         null);
         source = SqlValidatorUtil.addAlias(source, UPDATE_SRC_ALIAS);
         SqlMerge mergeCall =
@@ -1744,6 +1752,7 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
                 null,
                 null,
                 null,
+                null,
                 null);
     }
 
@@ -1774,6 +1783,7 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
                 null,
                 null,
                 null,
+                null,
                 null);
     }
 
@@ -2785,6 +2795,9 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
                 clauseScopes.put(IdPair.of(select, Clause.WHERE), selectScope);
                 registerOperandSubQueries(selectScope, select, 
SqlSelect.WHERE_OPERAND);
 
+                // Register subqueries in the QUALIFY clause
+                registerOperandSubQueries(selectScope, select, 
SqlSelect.QUALIFY_OPERAND);
+
                 // Register FROM with the inherited scope 'parentScope', not
                 // 'selectScope', otherwise tables in the FROM clause would be
                 // able to see each other.
@@ -3599,12 +3612,11 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
         String name = id.names.get(0);
         SqlNameMatcher nameMatcher = getCatalogReader().nameMatcher();
         RelDataType rowType = getNamespaceOrThrow(node).getRowType();
-        RelDataType colType =
+        final RelDataTypeField field =
                 requireNonNull(
-                                nameMatcher.field(rowType, name),
-                                () -> "unable to find left field " + name + " 
in " + rowType)
-                        .getType();
-        return colType;
+                        nameMatcher.field(rowType, name),
+                        () -> "unable to find left field " + name + " in " + 
rowType);
+        return field.getType();
     }
 
     /**
@@ -3710,6 +3722,7 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
         validateGroupClause(select);
         validateHavingClause(select);
         validateWindowClause(select);
+        validateQualifyClause(select);
         handleOffsetFetch(select.getOffset(), select.getFetch());
 
         // Validate the SELECT clause late, because a select item might
@@ -4158,6 +4171,34 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
         windowList.validate(this, windowScope);
     }
 
+    protected void validateQualifyClause(SqlSelect select) {
+        SqlNode qualifyNode = select.getQualify();
+        if (qualifyNode == null) {
+            return;
+        }
+
+        SqlValidatorScope qualifyScope = getSelectScope(select);
+
+        qualifyNode = extendedExpand(qualifyNode, qualifyScope, select, 
Clause.QUALIFY);
+        select.setQualify(qualifyNode);
+
+        inferUnknownTypes(booleanType, qualifyScope, qualifyNode);
+
+        qualifyNode.validate(this, qualifyScope);
+
+        final RelDataType type = deriveType(qualifyScope, qualifyNode);
+        if (!SqlTypeUtil.inBooleanFamily(type)) {
+            throw newValidationError(qualifyNode, 
RESOURCE.condMustBeBoolean("QUALIFY"));
+        }
+
+        boolean qualifyContainsWindowFunction = 
overFinder.findAgg(qualifyNode) != null;
+        if (!qualifyContainsWindowFunction) {
+            throw newValidationError(
+                    qualifyNode,
+                    
RESOURCE.qualifyExpressionMustContainWindowFunction(qualifyNode.toString()));
+        }
+    }
+
     @Override
     public void validateWith(SqlWith with, SqlValidatorScope scope) {
         final SqlValidatorNamespace namespace = getNamespaceOrThrow(with);
@@ -4349,7 +4390,7 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
         // expand the expression in group list.
         List<SqlNode> expandedList = new ArrayList<>();
         for (SqlNode groupItem : groupList) {
-            SqlNode expandedItem = expandGroupByOrHavingExpr(groupItem, 
groupScope, select, false);
+            SqlNode expandedItem = extendedExpand(groupItem, groupScope, 
select, Clause.GROUP_BY);
             expandedList.add(expandedItem);
         }
         groupList = new SqlNodeList(expandedList, 
groupList.getParserPosition());
@@ -4475,7 +4516,7 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
         }
         final AggregatingScope havingScope = (AggregatingScope) 
getSelectScope(select);
         if (config.conformance().isHavingAlias()) {
-            SqlNode newExpr = expandGroupByOrHavingExpr(having, havingScope, 
select, true);
+            SqlNode newExpr = extendedExpand(having, havingScope, select, 
Clause.HAVING);
             if (having != newExpr) {
                 having = newExpr;
                 select.setHaving(newExpr);
@@ -4981,14 +5022,13 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
         // matched
         boolean isUpdateModifiableViewTable = false;
         if (query instanceof SqlUpdate) {
-            final SqlNodeList targetColumnList = ((SqlUpdate) 
query).getTargetColumnList();
-            if (targetColumnList != null) {
-                final int targetColumnCnt = targetColumnList.size();
-                targetRowType =
-                        SqlTypeUtil.extractLastNFields(typeFactory, 
targetRowType, targetColumnCnt);
-                sourceRowType =
-                        SqlTypeUtil.extractLastNFields(typeFactory, 
sourceRowType, targetColumnCnt);
-            }
+            final SqlNodeList targetColumnList =
+                    requireNonNull(((SqlUpdate) query).getTargetColumnList());
+            final int targetColumnCount = targetColumnList.size();
+            targetRowType =
+                    SqlTypeUtil.extractLastNFields(typeFactory, targetRowType, 
targetColumnCount);
+            sourceRowType =
+                    SqlTypeUtil.extractLastNFields(typeFactory, sourceRowType, 
targetColumnCount);
             isUpdateModifiableViewTable = 
table.unwrap(ModifiableViewTable.class) != null;
         }
         if (SqlTypeUtil.equalAsStructSansNullability(
@@ -5493,13 +5533,12 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
                 node.validate(this, scope);
                 SqlIdentifier identifier;
                 if (node instanceof SqlBasicCall) {
-                    identifier = (SqlIdentifier) ((SqlBasicCall) 
node).operand(0);
+                    identifier = ((SqlBasicCall) node).operand(0);
                 } else {
                     identifier =
-                            (SqlIdentifier)
-                                    requireNonNull(
-                                            node,
-                                            () -> "order by field is null. All 
fields: " + orderBy);
+                            requireNonNull(
+                                    (SqlIdentifier) node,
+                                    () -> "order by field is null. All fields: 
" + orderBy);
                 }
 
                 if (allRows) {
@@ -5721,7 +5760,7 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
 
     public void validatePivot(SqlPivot pivot) {
         final PivotScope scope =
-                (PivotScope) requireNonNull(getJoinScope(pivot), () -> 
"joinScope for " + pivot);
+                requireNonNull((PivotScope) getJoinScope(pivot), () -> 
"joinScope for " + pivot);
 
         final PivotNamespace ns = 
getNamespaceOrThrow(pivot).unwrap(PivotNamespace.class);
         assert ns.rowType == null;
@@ -6146,9 +6185,10 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
         return newExpr;
     }
 
-    public SqlNode expandGroupByOrHavingExpr(
-            SqlNode expr, SqlValidatorScope scope, SqlSelect select, boolean 
havingExpression) {
-        final Expander expander = new ExtendedExpander(this, scope, select, 
expr, havingExpression);
+    /** Expands an expression in a GROUP BY, HAVING or QUALIFY clause. */
+    private SqlNode extendedExpand(
+            SqlNode expr, SqlValidatorScope scope, SqlSelect select, Clause 
clause) {
+        final Expander expander = new ExtendedExpander(this, scope, select, 
expr, clause);
         SqlNode newExpr = expander.go(expr);
         if (expr != newExpr) {
             setOriginal(newExpr, expr);
@@ -6156,6 +6196,10 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
         return newExpr;
     }
 
+    public SqlNode extendedExpandGroupBy(SqlNode expr, SqlValidatorScope 
scope, SqlSelect select) {
+        return extendedExpand(expr, scope, select, Clause.GROUP_BY);
+    }
+
     @Override
     public boolean isSystemField(RelDataTypeField field) {
         return false;
@@ -6770,67 +6814,74 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
     static class ExtendedExpander extends Expander {
         final SqlSelect select;
         final SqlNode root;
-        final boolean havingExpr;
+        final Clause clause;
 
         ExtendedExpander(
                 SqlValidatorImpl validator,
                 SqlValidatorScope scope,
                 SqlSelect select,
                 SqlNode root,
-                boolean havingExpr) {
+                Clause clause) {
             super(validator, scope);
             this.select = select;
             this.root = root;
-            this.havingExpr = havingExpr;
+            this.clause = clause;
         }
 
         @Override
         public @Nullable SqlNode visit(SqlIdentifier id) {
-            if (id.isSimple()
-                    && (havingExpr
-                            ? validator.config().conformance().isHavingAlias()
-                            : 
validator.config().conformance().isGroupByAlias())) {
-                String name = id.getSimple();
-                SqlNode expr = null;
-                final SqlNameMatcher nameMatcher = 
validator.catalogReader.nameMatcher();
-                int n = 0;
-                for (SqlNode s : 
SqlNonNullableAccessors.getSelectList(select)) {
-                    final @Nullable String alias = SqlValidatorUtil.alias(s);
-                    if (alias != null && nameMatcher.matches(alias, name)) {
-                        expr = s;
-                        n++;
-                    }
-                }
-                if (n == 0) {
-                    return super.visit(id);
-                } else if (n > 1) {
-                    // More than one column has this alias.
-                    throw validator.newValidationError(id, 
RESOURCE.columnAmbiguous(name));
-                }
-                if (havingExpr && validator.isAggregate(root)) {
-                    return super.visit(id);
-                }
-                expr = stripAs(expr);
-                if (expr instanceof SqlIdentifier) {
-                    SqlIdentifier sid = (SqlIdentifier) expr;
-                    final SqlIdentifier fqId = 
getScope().fullyQualify(sid).identifier;
-                    expr = expandDynamicStar(sid, fqId);
-                }
-                return expr;
+            if (!id.isSimple()) {
+                return super.visit(id);
             }
-            if (id.isSimple()) {
+
+            final boolean replaceAliases = 
clause.shouldReplaceAliases(validator.config);
+            if (!replaceAliases) {
                 final SelectScope scope = validator.getRawSelectScope(select);
                 SqlNode node = expandCommonColumn(select, id, scope, 
validator);
                 if (node != id) {
                     return node;
                 }
+                return super.visit(id);
+            }
+
+            String name = id.getSimple();
+            SqlNode expr = null;
+            final SqlNameMatcher nameMatcher = 
validator.catalogReader.nameMatcher();
+            int n = 0;
+            for (SqlNode s : SqlNonNullableAccessors.getSelectList(select)) {
+                final @Nullable String alias = SqlValidatorUtil.alias(s);
+                if (alias != null && nameMatcher.matches(alias, name)) {
+                    expr = s;
+                    n++;
+                }
+            }
+
+            if (n == 0) {
+                return super.visit(id);
+            } else if (n > 1) {
+                // More than one column has this alias.
+                throw validator.newValidationError(id, 
RESOURCE.columnAmbiguous(name));
+            }
+            Iterable<SqlCall> allAggList = 
validator.aggFinder.findAll(ImmutableList.of(root));
+            for (SqlCall agg : allAggList) {
+                if (clause == Clause.HAVING && containsIdentifier(agg, id)) {
+                    return super.visit(id);
+                }
             }
-            return super.visit(id);
+
+            expr = stripAs(expr);
+            if (expr instanceof SqlIdentifier) {
+                SqlIdentifier sid = (SqlIdentifier) expr;
+                final SqlIdentifier fqId = 
getScope().fullyQualify(sid).identifier;
+                expr = expandDynamicStar(sid, fqId);
+            }
+
+            return expr;
         }
 
         @Override
         public @Nullable SqlNode visit(SqlLiteral literal) {
-            if (havingExpr || 
!validator.config().conformance().isGroupByOrdinal()) {
+            if (clause != Clause.GROUP_BY || 
!validator.config().conformance().isGroupByOrdinal()) {
                 return super.visit(literal);
             }
             boolean isOrdinalLiteral = literal == root;
@@ -6878,6 +6929,32 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
 
             return super.visit(literal);
         }
+
+        /**
+         * Returns whether a given node contains a {@link SqlIdentifier}.
+         *
+         * @param sqlNode a SqlNode
+         * @param target a SqlIdentifier
+         */
+        private boolean containsIdentifier(SqlNode sqlNode, SqlIdentifier 
target) {
+            try {
+                SqlVisitor<Void> visitor =
+                        new SqlBasicVisitor<Void>() {
+                            @Override
+                            public Void visit(SqlIdentifier identifier) {
+                                if (identifier.equalsDeep(target, 
Litmus.IGNORE)) {
+                                    throw new Util.FoundOne(target);
+                                }
+                                return super.visit(identifier);
+                            }
+                        };
+                sqlNode.accept(visitor);
+                return false;
+            } catch (Util.FoundOne e) {
+                Util.swallow(e, null);
+                return true;
+            }
+        }
     }
 
     /** Information about an identifier in a particular scope. */
@@ -7350,6 +7427,52 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
         GROUP_BY,
         SELECT,
         ORDER,
-        CURSOR
+        CURSOR,
+        HAVING,
+        QUALIFY;
+
+        /**
+         * Determines if the extender should replace aliases with expanded 
values. For example:
+         *
+         * <blockquote>
+         *
+         * <pre>{@code
+         * SELECT a + a as twoA
+         * GROUP BY twoA
+         * }</pre>
+         *
+         * </blockquote>
+         *
+         * <p>turns into
+         *
+         * <blockquote>
+         *
+         * <pre>{@code
+         * SELECT a + a as twoA
+         * GROUP BY a + a
+         * }</pre>
+         *
+         * </blockquote>
+         *
+         * <p>This is determined both by the clause and the config.
+         *
+         * @param config The configuration
+         * @return Whether we should replace the alias with its expanded value
+         */
+        boolean shouldReplaceAliases(Config config) {
+            switch (this) {
+                case GROUP_BY:
+                    return config.conformance().isGroupByAlias();
+
+                case HAVING:
+                    return config.conformance().isHavingAlias();
+
+                case QUALIFY:
+                    return true;
+
+                default:
+                    throw Util.unexpected(this);
+            }
+        }
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
index bc13de49927..9e6b25acb13 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
@@ -124,7 +124,11 @@ import static java.util.Objects.requireNonNull;
 import static org.apache.calcite.linq4j.Nullness.castNonNull;
 
 /**
- * Copied to fix calcite issues. FLINK modifications are at lines
+ * RelDecorrelator replaces all correlated expressions (corExp) in a 
relational expression (RelNode)
+ * tree with non-correlated expressions that are produced from joining the 
RelNode that produces the
+ * corExp with the RelNode that references it.
+ *
+ * <p>TODO:
  *
  * <ol>
  *   <li>Was changed within FLINK-29280, FLINK-28682, FLINK-35804: Line 218 ~ 
225, Line 273 ~ 288
@@ -1069,17 +1073,11 @@ public class RelDecorrelator implements 
ReflectiveVisitor {
 
         // can directly add positions into corDefOutputs since join
         // does not change the output ordering from the inputs.
-        RelNode valueGen =
-                requireNonNull(
-                        createValueGenerator(corVarList, leftInputOutputCount, 
corDefOutputs),
-                        "createValueGenerator(...) is null");
+        final RelNode valueGen =
+                createValueGenerator(corVarList, leftInputOutputCount, 
corDefOutputs);
+        requireNonNull(valueGen, "valueGen");
 
-        RelNode join =
-                relBuilder
-                        .push(frame.r)
-                        .push(valueGen)
-                        .join(JoinRelType.INNER, relBuilder.literal(true), 
ImmutableSet.of())
-                        .build();
+        RelNode join = 
relBuilder.push(frame.r).push(valueGen).join(JoinRelType.INNER).build();
 
         // Join or Filter does not change the old input ordering. All
         // input fields from newLeftInput (i.e. the original input to the old
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
index e1111ef812f..40477e1e2db 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
@@ -98,6 +98,7 @@ import org.apache.calcite.rex.RexFieldCollation;
 import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexOver;
 import org.apache.calcite.rex.RexPatternFieldRef;
 import org.apache.calcite.rex.RexRangeRef;
 import org.apache.calcite.rex.RexShuttle;
@@ -225,6 +226,7 @@ import java.util.stream.Collectors;
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
 import static org.apache.calcite.linq4j.Nullness.castNonNull;
+import static org.apache.calcite.runtime.FlatLists.append;
 import static org.apache.calcite.sql.SqlUtil.stripAs;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -781,6 +783,8 @@ public class SqlToRelConverter {
             convertSelectList(bb, select, orderExprList);
         }
 
+        convertQualify(bb, select.getQualify());
+
         if (select.isDistinct()) {
             distinctify(bb, true);
         }
@@ -895,8 +899,7 @@ public class SqlToRelConverter {
         }
 
         assert rel != null : "rel must not be null, root = " + bb.root;
-        // Usual case: all of the expressions in the SELECT clause are
-        // different.
+        // Usual case: all expressions in the SELECT clause are different.
         final ImmutableBitSet groupSet = 
ImmutableBitSet.range(rel.getRowType().getFieldCount());
         rel = createAggregate(bb, groupSet, ImmutableList.of(groupSet), 
ImmutableList.of());
 
@@ -3369,8 +3372,8 @@ public class SqlToRelConverter {
                         condition,
                         convertJoinType(join.getJoinType()));
         relBuilder.push(joinRel);
-        final RelNode newProjectRel = 
relBuilder.project(relBuilder.fields()).build();
-        bb.setRoot(newProjectRel, false);
+        relBuilder.project(relBuilder.fields());
+        bb.setRoot(relBuilder.build(), false);
     }
 
     private RexNode convertNaturalCondition(
@@ -3388,8 +3391,8 @@ public class SqlToRelConverter {
             SqlValidatorNamespace leftNamespace,
             SqlValidatorNamespace rightNamespace) {
         final SqlNodeList list =
-                (SqlNodeList)
-                        requireNonNull(join.getCondition(), () -> 
"getCondition for join " + join);
+                requireNonNull(
+                        (SqlNodeList) join.getCondition(), () -> "getCondition 
for join " + join);
         return convertUsing(
                 leftNamespace,
                 rightNamespace,
@@ -4317,10 +4320,9 @@ public class SqlToRelConverter {
 
     private RelNode convertDelete(SqlDelete call) {
         RelOptTable targetTable = getTargetTable(call);
-        RelNode sourceRel =
-                convertSelect(
-                        requireNonNull(call.getSourceSelect(), () -> 
"sourceSelect for " + call),
-                        false);
+        final SqlSelect sourceSelect =
+                requireNonNull(call.getSourceSelect(), () -> "sourceSelect for 
" + call);
+        RelNode sourceRel = convertSelect(sourceSelect, false);
         return LogicalTableModify.create(
                 targetTable,
                 catalogReader,
@@ -4332,11 +4334,9 @@ public class SqlToRelConverter {
     }
 
     private RelNode convertUpdate(SqlUpdate call) {
-        final SqlValidatorScope scope =
-                validator()
-                        .getWhereScope(
-                                requireNonNull(
-                                        call.getSourceSelect(), () -> 
"sourceSelect for " + call));
+        final SqlSelect sourceSelect =
+                requireNonNull(call.getSourceSelect(), () -> "sourceSelect for 
" + call);
+        final SqlValidatorScope scope = 
validator().getWhereScope(sourceSelect);
         Blackboard bb = createBlackboard(scope, null, false);
 
         replaceSubQueries(bb, call, RelOptUtil.Logic.TRUE_FALSE_UNKNOWN);
@@ -4351,14 +4351,11 @@ public class SqlToRelConverter {
             RelDataTypeField field =
                     SqlValidatorUtil.getTargetField(
                             targetRowType, typeFactory, id, catalogReader, 
targetTable);
-            assert field != null : "column " + id.toString() + " not found";
+            assert field != null : "column " + id + " not found";
             targetColumnNameList.add(field.getName());
         }
 
-        RelNode sourceRel =
-                convertSelect(
-                        requireNonNull(call.getSourceSelect(), () -> 
"sourceSelect for " + call),
-                        false);
+        RelNode sourceRel = convertSelect(sourceSelect, false);
 
         bb.setRoot(sourceRel, false);
         ImmutableList.Builder<RexNode> rexNodeSourceExpressionListBuilder = 
ImmutableList.builder();
@@ -4404,10 +4401,9 @@ public class SqlToRelConverter {
 
         // first, convert the merge's source select to construct the columns
         // from the target table and the set expressions in the update call
-        RelNode mergeSourceRel =
-                convertSelect(
-                        requireNonNull(call.getSourceSelect(), () -> 
"sourceSelect for " + call),
-                        false);
+        final SqlSelect sourceSelect =
+                requireNonNull(call.getSourceSelect(), () -> "sourceSelect for 
" + call);
+        RelNode mergeSourceRel = convertSelect(sourceSelect, false);
 
         // then, convert the insert statement so we can get the insert
         // values expressions
@@ -4684,6 +4680,10 @@ public class SqlToRelConverter {
         selectList = validator().expandStar(selectList, select, false);
 
         replaceSubQueries(bb, selectList, RelOptUtil.Logic.TRUE_FALSE_UNKNOWN);
+        replaceSubQueries(
+                bb,
+                new SqlNodeList(orderList, SqlParserPos.ZERO),
+                RelOptUtil.Logic.TRUE_FALSE_UNKNOWN);
 
         List<String> fieldNames = new ArrayList<>();
         final List<RexNode> exprs = new ArrayList<>();
@@ -4780,6 +4780,118 @@ public class SqlToRelConverter {
         return alias;
     }
 
+    private void convertQualify(Blackboard bb, @Nullable SqlNode qualify) {
+        if (qualify == null) {
+            return;
+        }
+
+        final LogicalProject projectionFromSelect =
+                requireNonNull((LogicalProject) bb.root, "root");
+
+        // Convert qualify SqlNode to a RexNode
+        replaceSubQueries(bb, qualify, RelOptUtil.Logic.UNKNOWN_AS_FALSE);
+        final RelNode originalRoot = requireNonNull(bb.root, "root");
+        RexNode qualifyRexNode;
+        try {
+            // Set the root to the input of the project,
+            // since QUALIFY might have an expression in the OVER clause
+            // that references a column not in the SELECT.
+            bb.setRoot(projectionFromSelect.getInput(), false);
+            qualifyRexNode = bb.convertExpression(qualify);
+        } finally {
+            bb.setRoot(originalRoot, false);
+        }
+
+        // Check to see if the qualify expression has a referenced expression 
and
+        // do some referencing accordingly
+        final RexNode qualifyWithReferencesRexNode =
+                qualifyRexNode.accept(new 
DuplicateEliminator(projectionFromSelect.getProjects()));
+
+        // Create a Project with the QUALIFY expression
+        if (qualifyWithReferencesRexNode.equals(qualifyRexNode)) {
+            // The QUALIFY expression does not depend on any references like 
so:
+            //
+            //  SELECT A, B
+            //  FROM tbl
+            //  QUALIFY WINDOW(C) = 1
+            //
+            // Meaning we should generate a plan like:
+            //  Project(A, B, WINDOW(C) = 1 as QualifyExpression)
+            //    TableScan(tbl)
+            //
+            relBuilder
+                    .push(projectionFromSelect.getInput())
+                    .project(
+                            append(projectionFromSelect.getProjects(), 
qualifyRexNode),
+                            append(
+                                    
projectionFromSelect.getRowType().getFieldNames(),
+                                    "QualifyExpression"));
+        } else {
+            // The QUALIFY expression depended on a reference meaning
+            // we need to introduce an extra project like so:
+            //
+            //  SELECT A, B, WINDOW(C) as window_val
+            //  FROM tbl
+            //  QUALIFY window_val = 1
+            //
+            // Meaning we should generate a plan like:
+            //
+            //  Project($0, $1, $2, =($2, 1) as QualifyExpression)
+            //    Project(A, B, WINDOW(C) as window_val)
+            //      TableScan(tbl)
+            //
+            // This is a very specific application of Common Subexpression 
Elimination
+            // (CSE), since the window value pops up twice.
+            relBuilder
+                    .push(requireNonNull(bb.root, "root"))
+                    .project(
+                            append(relBuilder.fields(), 
qualifyWithReferencesRexNode),
+                            append(
+                                    
relBuilder.peek().getRowType().getFieldNames(),
+                                    "QualifyExpression"));
+        }
+
+        // Filter on that extra column
+        relBuilder.filter(Util.last(relBuilder.fields()));
+
+        // Remove that extra column from the projection
+        relBuilder.project(
+                Util.first(relBuilder.fields(), 
projectionFromSelect.getProjects().size()));
+
+        // Update the root
+        bb.setRoot(relBuilder.build(), false);
+    }
+
+    /**
+     * Eliminates a common sub-expression by looking for a {@link RexNode} in 
the expressions of a
+     * {@link Project}; if found, returns a refIndex instead of the raw node.
+     */
+    private static final class DuplicateEliminator extends RexShuttle {
+        private final List<RexNode> projects;
+
+        DuplicateEliminator(List<RexNode> projects) {
+            this.projects = projects;
+        }
+
+        @Override
+        public RexNode visitCall(RexCall call) {
+            final int i = projects.indexOf(call);
+            if (i >= 0) {
+                return new RexInputRef(i, projects.get(i).getType());
+            }
+            return super.visitCall(call);
+        }
+
+        @Override
+        public RexNode visitOver(RexOver over) {
+            final int i = projects.indexOf(over);
+            if (i >= 0) {
+                return new RexInputRef(i, projects.get(i).getType());
+            }
+            return over;
+        }
+    }
+
     /** Converts a WITH sub-query into a relational expression. */
     public RelRoot convertWith(SqlWith with, boolean top) {
         return convertQuery(with.body, false, top);
@@ -6740,15 +6852,23 @@ public class SqlToRelConverter {
         Config withExplain(boolean explain);
 
         /**
-         * Returns the {@code expand} option. Controls whether to expand 
sub-queries. If false, each
-         * sub-query becomes a {@link org.apache.calcite.rex.RexSubQuery}.
+         * Returns the {@code expand} option. Controls whether to expand 
sub-queries. If false (the
+         * default), each sub-query becomes a {@link 
org.apache.calcite.rex.RexSubQuery}.
+         *
+         * <p>Setting {@code expand} to true is deprecated. Expansion still 
works, but there will be
+         * less development effort in that area.
          */
         @Value.Default
         default boolean isExpand() {
-            return true;
+            return false;
         }
 
-        /** Sets {@link #isExpand()}. */
+        /**
+         * Sets {@link #isExpand()}.
+         *
+         * <p>Expansion is deprecated. We recommend that you do not call this 
method, and use the
+         * default value of {@link #isExpand()}, false.
+         */
         Config withExpand(boolean expand);
 
         /**
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/StandardConvertletTable.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/StandardConvertletTable.java
index 71580fb2ea9..ce40f6bbeab 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/StandardConvertletTable.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/StandardConvertletTable.java
@@ -93,6 +93,7 @@ import java.util.List;
 import java.util.Objects;
 import java.util.function.Function;
 import java.util.function.Predicate;
+import java.util.function.UnaryOperator;
 import java.util.stream.Collectors;
 
 import static java.util.Objects.requireNonNull;
@@ -105,7 +106,7 @@ import static org.apache.calcite.util.Util.first;
  * <p>FLINK modifications are at lines
  *
  * <ol>
- *   <li>Added in Flink-35216: Lines 712 ~ 757
+ *   <li>Added in Flink-35216: Lines 731 ~ 776
  * </ol>
  */
 public class StandardConvertletTable extends ReflectiveConvertletTable {
@@ -154,6 +155,19 @@ public class StandardConvertletTable extends 
ReflectiveConvertletTable {
                     }
                 });
 
+        // DATE(string) is equivalent to CAST(string AS DATE),
+        // but other DATE variants are treated as regular functions.
+        registerOp(
+                SqlLibraryOperators.DATE,
+                (cx, call) -> {
+                    final RexCall e = (RexCall) 
StandardConvertletTable.this.convertCall(cx, call);
+                    if (e.getOperands().size() == 1
+                            && 
SqlTypeUtil.isString(e.getOperands().get(0).getType())) {
+                        return cx.getRexBuilder().makeCast(e.type, 
e.getOperands().get(0));
+                    }
+                    return e;
+                });
+
         registerOp(SqlLibraryOperators.LTRIM, new 
TrimConvertlet(SqlTrimFunction.Flag.LEADING));
         registerOp(SqlLibraryOperators.RTRIM, new 
TrimConvertlet(SqlTrimFunction.Flag.TRAILING));
 
@@ -166,7 +180,12 @@ public class StandardConvertletTable extends 
ReflectiveConvertletTable {
         registerOp(
                 SqlLibraryOperators.SUBSTR_POSTGRESQL, new 
SubstrConvertlet(SqlLibrary.POSTGRESQL));
 
+        registerOp(SqlLibraryOperators.DATE_ADD, new TimestampAddConvertlet());
+        registerOp(SqlLibraryOperators.DATE_DIFF, new 
TimestampDiffConvertlet());
         registerOp(SqlLibraryOperators.DATE_SUB, new TimestampSubConvertlet());
+        registerOp(SqlLibraryOperators.DATETIME_ADD, new 
TimestampAddConvertlet());
+        registerOp(SqlLibraryOperators.DATETIME_DIFF, new 
TimestampDiffConvertlet());
+        registerOp(SqlLibraryOperators.DATETIME_SUB, new 
TimestampSubConvertlet());
         registerOp(SqlLibraryOperators.TIME_ADD, new TimestampAddConvertlet());
         registerOp(SqlLibraryOperators.TIME_DIFF, new 
TimestampDiffConvertlet());
         registerOp(SqlLibraryOperators.TIME_SUB, new TimestampSubConvertlet());
@@ -1377,15 +1396,11 @@ public class StandardConvertletTable extends 
ReflectiveConvertletTable {
 
         private static SqlNode getCastedSqlNode(
                 SqlNode argInput, RelDataType varType, SqlParserPos pos, 
@Nullable RexNode argRex) {
-            SqlNode arg;
-            if (argRex != null && !argRex.getType().equals(varType)) {
-                arg =
-                        SqlStdOperatorTable.CAST.createCall(
-                                pos, argInput, 
SqlTypeUtil.convertTypeToSpec(varType));
-            } else {
-                arg = argInput;
+            if (argRex == null || argRex.getType().equals(varType)) {
+                return argInput;
             }
-            return arg;
+            return SqlStdOperatorTable.CAST.createCall(
+                    pos, argInput, SqlTypeUtil.convertTypeToSpec(varType));
         }
     }
 
@@ -1523,15 +1538,11 @@ public class StandardConvertletTable extends 
ReflectiveConvertletTable {
 
         private static SqlNode getCastedSqlNode(
                 SqlNode argInput, RelDataType varType, SqlParserPos pos, 
@Nullable RexNode argRex) {
-            SqlNode arg;
-            if (argRex != null && !argRex.getType().equals(varType)) {
-                arg =
-                        SqlStdOperatorTable.CAST.createCall(
-                                pos, argInput, 
SqlTypeUtil.convertTypeToSpec(varType));
-            } else {
-                arg = argInput;
+            if (argRex == null || argRex.getType().equals(varType)) {
+                return argInput;
             }
-            return arg;
+            return SqlStdOperatorTable.CAST.createCall(
+                    pos, argInput, SqlTypeUtil.convertTypeToSpec(varType));
         }
     }
 
@@ -1797,6 +1808,9 @@ public class StandardConvertletTable extends 
ReflectiveConvertletTable {
         public RexNode convertCall(SqlRexContext cx, SqlCall call) {
             // TIMESTAMPADD(unit, count, timestamp)
             //  => timestamp + count * INTERVAL '1' UNIT
+            // TIMESTAMP_ADD(timestamp, interval)
+            //  => timestamp + interval
+            // "timestamp" may be of type TIMESTAMP or TIMESTAMP WITH LOCAL 
TIME ZONE.
             final RexBuilder rexBuilder = cx.getRexBuilder();
             SqlIntervalQualifier qualifier;
             final RexNode op1;
@@ -1911,34 +1925,79 @@ public class StandardConvertletTable extends 
ReflectiveConvertletTable {
     private static class TimestampDiffConvertlet implements SqlRexConvertlet {
         @Override
         public RexNode convertCall(SqlRexContext cx, SqlCall call) {
+            // The standard TIMESTAMPDIFF and BigQuery's TIMESTAMP_DIFF have 
two key
+            // differences. The first being the order of the subtraction, 
outlined
+            // below. The second is that BigQuery truncates each timestamp to 
the
+            // specified time unit before the difference is computed.
+            //
+            // In fact, all BigQuery functions (TIMESTAMP_DIFF, DATETIME_DIFF,
+            // DATE_DIFF) truncate before subtracting when applied to date 
intervals
+            // (DAY, WEEK, ISOWEEK, MONTH, YEAR, etc.)
+            //
+            // For example, if computing the number of weeks between two 
timestamps,
+            // one occurring on a Saturday and the other occurring the next 
day on
+            // Sunday, their week difference is 1. This is because the first 
timestamp
+            // is truncated to the previous Sunday. This is done by making 
calls to
+            // TIMESTAMP_TRUNC and the difference is then computed using their
+            // results.
+            //
             // TIMESTAMPDIFF(unit, t1, t2)
             //    => (t2 - t1) UNIT
             // TIMESTAMP_DIFF(t1, t2, unit)
             //    => (t1 - t2) UNIT
             SqlIntervalQualifier qualifier;
+            final boolean preTruncate;
             final RexNode op1;
             final RexNode op2;
             if (call.operand(0).getKind() == SqlKind.INTERVAL_QUALIFIER) {
                 qualifier = call.operand(0);
+                preTruncate = false;
                 op1 = cx.convertExpression(call.operand(1));
                 op2 = cx.convertExpression(call.operand(2));
             } else {
                 qualifier = call.operand(2);
+                preTruncate = qualifier.isDate();
                 op1 = cx.convertExpression(call.operand(1));
                 op2 = cx.convertExpression(call.operand(0));
             }
             final RexBuilder rexBuilder = cx.getRexBuilder();
+            final RelDataTypeFactory typeFactory = cx.getTypeFactory();
             final TimeFrame timeFrame = 
cx.getValidator().validateTimeFrame(qualifier);
             final TimeUnit unit = first(timeFrame.unit(), TimeUnit.EPOCH);
+            UnaryOperator<RexNode> truncateFn = UnaryOperator.identity();
 
             if (unit == TimeUnit.EPOCH && qualifier.timeFrameName != null) {
                 // Custom time frames have a different path. They are kept as 
names, and
                 // then handled by Java functions.
                 final RexLiteral timeFrameName = 
rexBuilder.makeLiteral(qualifier.timeFrameName);
+                // This additional logic accounts for BigQuery truncating 
prior to
+                // computing the difference.
+                if (preTruncate) {
+                    truncateFn =
+                            e ->
+                                    rexBuilder.makeCall(
+                                            e.getType(),
+                                            
SqlLibraryOperators.TIMESTAMP_TRUNC,
+                                            ImmutableList.of(e, 
timeFrameName));
+                }
                 return rexBuilder.makeCall(
                         cx.getValidator().getValidatedNodeType(call),
                         SqlStdOperatorTable.TIMESTAMP_DIFF,
-                        ImmutableList.of(timeFrameName, op1, op2));
+                        ImmutableList.of(
+                                timeFrameName, truncateFn.apply(op1), 
truncateFn.apply(op2)));
+            }
+
+            if (preTruncate) {
+                // The timestamps should be truncated unless the time unit is 
HOUR, in
+                // which case only the whole number of hours between the 
timestamps
+                // should be returned.
+                final RexNode timeUnit = cx.convertExpression(qualifier);
+                truncateFn =
+                        e ->
+                                rexBuilder.makeCall(
+                                        e.getType(),
+                                        SqlLibraryOperators.TIMESTAMP_TRUNC,
+                                        ImmutableList.of(e, timeUnit));
             }
 
             BigDecimal multiplier = BigDecimal.ONE;
@@ -1968,23 +2027,21 @@ public class StandardConvertletTable extends 
ReflectiveConvertletTable {
                     qualifier = new SqlIntervalQualifier(unit, null, 
qualifier.getParserPosition());
                     break;
             }
+
             final RelDataType intervalType =
-                    cx.getTypeFactory()
-                            .createTypeWithNullability(
-                                    
cx.getTypeFactory().createSqlIntervalType(qualifier),
-                                    op1.getType().isNullable() || 
op2.getType().isNullable());
-            final RexCall rexCall =
-                    (RexCall)
-                            rexBuilder.makeCall(
-                                    intervalType,
-                                    SqlStdOperatorTable.MINUS_DATE,
-                                    ImmutableList.of(op2, op1));
+                    typeFactory.createTypeWithNullability(
+                            typeFactory.createSqlIntervalType(qualifier),
+                            op1.getType().isNullable() || 
op2.getType().isNullable());
+            final RexNode call2 =
+                    rexBuilder.makeCall(
+                            intervalType,
+                            SqlStdOperatorTable.MINUS_DATE,
+                            ImmutableList.of(truncateFn.apply(op2), 
truncateFn.apply(op1)));
             final RelDataType intType =
-                    cx.getTypeFactory()
-                            .createTypeWithNullability(
-                                    
cx.getTypeFactory().createSqlType(sqlTypeName),
-                                    
SqlTypeUtil.containsNullable(rexCall.getType()));
-            RexNode e = rexBuilder.makeCast(intType, rexCall);
+                    typeFactory.createTypeWithNullability(
+                            typeFactory.createSqlType(sqlTypeName),
+                            SqlTypeUtil.containsNullable(call2.getType()));
+            RexNode e = rexBuilder.makeCast(intType, call2);
             return rexBuilder.multiplyDivide(e, multiplier, divider);
         }
     }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/tools/RelBuilder.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/tools/RelBuilder.java
index 0c21d0af78c..22beba442a8 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/tools/RelBuilder.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/tools/RelBuilder.java
@@ -2288,7 +2288,7 @@ public class RelBuilder {
             @Nullable Iterable<? extends @Nullable String> fieldNames,
             boolean force,
             Iterable<CorrelationId> variablesSet) {
-        @SuppressWarnings("unchecked")
+        @SuppressWarnings({"unchecked", "rawtypes"})
         final List<? extends RexNode> nodeList =
                 nodes instanceof List ? (List) nodes : 
ImmutableList.copyOf(nodes);
         final List<@Nullable String> fieldNameList =
diff --git a/flink-table/flink-table-planner/src/main/resources/META-INF/NOTICE 
b/flink-table/flink-table-planner/src/main/resources/META-INF/NOTICE
index 2918375f6cd..6715ff98c4b 100644
--- a/flink-table/flink-table-planner/src/main/resources/META-INF/NOTICE
+++ b/flink-table/flink-table-planner/src/main/resources/META-INF/NOTICE
@@ -8,8 +8,8 @@ This project bundles the following dependencies under the 
Apache Software Licens
 
 - com.google.guava:guava:32.1.3-jre
 - com.google.guava:failureaccess:1.0.1
-- org.apache.calcite:calcite-core:1.33.0
-- org.apache.calcite:calcite-linq4j:1.33.0
+- org.apache.calcite:calcite-core:1.34.0
+- org.apache.calcite:calcite-linq4j:1.34.0
 - org.apache.calcite.avatica:avatica-core:1.23.0
 - org.apache.commons:commons-math3:3.6.1
 - commons-codec:commons-codec:1.15
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala
index 087aa6ab438..a02236f687c 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala
@@ -532,6 +532,48 @@ class OverAggregateITCase extends BatchTestBase {
     )
   }
 
+  @Test
+  def testWindowAggregationSumWithQualify(): Unit = {
+    checkResult(
+      "SELECT d, e FROM Table5 QUALIFY sum(e) OVER (PARTITION BY d ORDER BY e) 
> 20",
+      Seq(
+        row(4, 9),
+        row(4, 10),
+        row(5, 12),
+        row(5, 13),
+        row(5, 14),
+        row(5, 15)
+      )
+    )
+  }
+
+  @Test
+  def testWindowAggregationRowNumberWithQualify(): Unit = {
+    checkResult(
+      "SELECT d, e, row_number() OVER (PARTITION BY d ORDER BY e) AS rownum 
FROM Table5 " +
+        "QUALIFY rownum = 1",
+      Seq(
+        row(1, 1, 1),
+        row(2, 2, 1),
+        row(3, 4, 1),
+        row(4, 7, 1),
+        row(5, 11, 1)
+      )
+    )
+  }
+
+  @Test
+  def testWindowAggregationCountWithQualify(): Unit = {
+    checkResult(
+      "SELECT d, e FROM Table5 QUALIFY count(*) OVER (PARTITION BY d ORDER BY 
e) = 3",
+      Seq(
+        row(3, 6),
+        row(4, 9),
+        row(5, 13)
+      )
+    )
+  }
+
   @Test
   def testWindowAggregationCountWithOrderBy(): Unit = {
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/RankITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/RankITCase.scala
index 10662856840..677d3602ebf 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/RankITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/RankITCase.scala
@@ -232,4 +232,23 @@ class RankITCase extends BatchTestBase {
     )
   }
 
+  @Test
+  def testRankFunctionsWithQualify(): Unit = {
+    checkResult(
+      "SELECT a, b, c FROM Table3 " +
+        "QUALIFY RANK() OVER (PARTITION BY b ORDER BY a) > 2 ",
+      Seq(
+        row(6, 3, "Luke Skywalker"),
+        row(9, 4, "Comment#3"),
+        row(10, 4, "Comment#4"),
+        row(13, 5, "Comment#7"),
+        row(14, 5, "Comment#8"),
+        row(15, 5, "Comment#9"),
+        row(18, 6, "Comment#12"),
+        row(19, 6, "Comment#13"),
+        row(20, 6, "Comment#14"),
+        row(21, 6, "Comment#15")
+      )
+    )
+  }
 }
diff --git a/flink-table/pom.xml b/flink-table/pom.xml
index 4329be9be75..4163b2a8d5f 100644
--- a/flink-table/pom.xml
+++ b/flink-table/pom.xml
@@ -78,8 +78,8 @@ under the License.
        </dependencyManagement>
 
        <properties>
-               <calcite.version>1.33.0</calcite.version>
-               <!-- Calcite 1.33.0 depends on 3.1.8,
+               <calcite.version>1.34.0</calcite.version>
+               <!-- Calcite 1.34.0 depends on 3.1.8,
                at the same time minimum 3.1.x Janino version passing Flink 
tests without WAs is 3.1.10,
                more details are in FLINK-27995 -->
                <janino.version>3.1.10</janino.version>

Reply via email to