This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 11b1fdfc7cf [FLINK-34446][table] Port parser's fix of cross join with
lateral from Calcite (#24317)
11b1fdfc7cf is described below
commit 11b1fdfc7cffb74c7515a522b923fc2eba223fb8
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Thu Aug 1 17:45:17 2024 +0200
[FLINK-34446][table] Port parser's fix of cross join with lateral from
Calcite (#24317)
Backport of Calcite's fix
https://github.com/apache/calcite/commit/9fa1e16e326e37688b829a2a73064652df40c90d
thanks to Julian Hyde
---
.../src/main/codegen/templates/Parser.jj | 39 ++-
.../main/java/org/apache/calcite/sql/SqlJoin.java | 289 +++++++++++++++++++++
.../flink/sql/parser/FlinkSqlParserImplTest.java | 66 +++++
.../runtime/stream/sql/CorrelateITCase.scala | 19 ++
4 files changed, 393 insertions(+), 20 deletions(-)
diff --git a/flink-table/flink-sql-parser/src/main/codegen/templates/Parser.jj
b/flink-table/flink-sql-parser/src/main/codegen/templates/Parser.jj
index cc9345acc84..b299a703457 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/templates/Parser.jj
+++ b/flink-table/flink-sql-parser/src/main/codegen/templates/Parser.jj
@@ -1996,7 +1996,7 @@ SqlNode FromClause() :
SqlLiteral joinType;
}
{
- e = Join()
+ e = TableRef1(ExprContext.ACCEPT_QUERY_OR_JOIN)
(
// Comma joins should only occur at top-level in the FROM clause.
// Valid:
@@ -2005,33 +2005,30 @@ SqlNode FromClause() :
// Not valid:
// * FROM a CROSS JOIN (b, c)
LOOKAHEAD(1)
- <COMMA> { joinType = JoinType.COMMA.symbol(getPos()); }
- e2 = Join() {
- e = new SqlJoin(joinType.getParserPosition(),
- e,
- SqlLiteral.createBoolean(false, joinType.getParserPosition()),
- joinType,
- e2,
- JoinConditionType.NONE.symbol(SqlParserPos.ZERO),
- null);
- }
+ e = JoinOrCommaTable(e)
)*
{ return e; }
}
-SqlNode Join() :
+SqlNode JoinOrCommaTable(SqlNode e) :
{
- SqlNode e;
+ SqlNode e2;
+ SqlLiteral joinType;
}
{
- e = TableRef1(ExprContext.ACCEPT_QUERY_OR_JOIN)
- (
- LOOKAHEAD(2)
- e = JoinTable(e)
- )*
- {
- return e;
+ LOOKAHEAD(2)
+ <COMMA> { joinType = JoinType.COMMA.symbol(getPos()); }
+ e2 = TableRef1(ExprContext.ACCEPT_QUERY_OR_JOIN) {
+ return new SqlJoin(joinType.getParserPosition(),
+ e,
+ SqlLiteral.createBoolean(false, joinType.getParserPosition()),
+ joinType,
+ e2,
+ JoinConditionType.NONE.symbol(SqlParserPos.ZERO),
+ null);
}
+|
+ e2 = JoinTable(e) { return e2; }
}
/** Matches "LEFT JOIN t ON ...", "RIGHT JOIN t USING ...", "JOIN t". */
@@ -2179,6 +2176,8 @@ SqlNode TableRef3(ExprContext exprContext, boolean
lateral) :
tableRef = addLateral(tableRef, lateral)
[ tableRef = MatchRecognize(tableRef) ]
|
+ LOOKAHEAD(2)
+ [ <LATERAL> ] // "LATERAL" is implicit with "UNNEST", so ignore
<UNNEST> { s = span(); }
args = ParenthesizedQueryOrCommaList(ExprContext.ACCEPT_SUB_QUERY)
[
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/calcite/sql/SqlJoin.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/calcite/sql/SqlJoin.java
new file mode 100644
index 00000000000..ca7766b33fb
--- /dev/null
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/calcite/sql/SqlJoin.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.calcite.sql;
+
+import com.google.common.base.Preconditions;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.util.SqlString;
+import org.apache.calcite.util.ImmutableNullableList;
+import org.apache.calcite.util.Util;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import java.util.List;
+import java.util.function.UnaryOperator;
+
+import static java.util.Objects.requireNonNull;
+
+/** Parse tree node representing a {@code JOIN} clause. */
+public class SqlJoin extends SqlCall {
+ static final SqlJoinOperator COMMA_OPERATOR = new
SqlJoinOperator("COMMA-JOIN", 18);
+ public static final SqlJoinOperator OPERATOR = new SqlJoinOperator("JOIN",
18);
+
+ SqlNode left;
+
+ /** Operand says whether this is a natural join. Must be constant TRUE or
FALSE. */
+ SqlLiteral natural;
+
+ /** Value must be a {@link SqlLiteral}, one of the integer codes for
{@link JoinType}. */
+ SqlLiteral joinType;
+
+ SqlNode right;
+
+ /**
+ * Value must be a {@link SqlLiteral}, one of the integer codes for {@link
JoinConditionType}.
+ */
+ SqlLiteral conditionType;
+
+ @Nullable SqlNode condition;
+
+ // ~ Constructors
-----------------------------------------------------------
+
+ public SqlJoin(
+ SqlParserPos pos,
+ SqlNode left,
+ SqlLiteral natural,
+ SqlLiteral joinType,
+ SqlNode right,
+ SqlLiteral conditionType,
+ @Nullable SqlNode condition) {
+ super(pos);
+ this.left = left;
+ this.natural = requireNonNull(natural, "natural");
+ this.joinType = requireNonNull(joinType, "joinType");
+ this.right = right;
+ this.conditionType = requireNonNull(conditionType, "conditionType");
+ this.condition = condition;
+
+ Preconditions.checkArgument(natural.getTypeName() ==
SqlTypeName.BOOLEAN);
+ conditionType.getValueAs(JoinConditionType.class);
+ joinType.getValueAs(JoinType.class);
+ }
+
+ // ~ Methods
----------------------------------------------------------------
+
+ @Override
+ public SqlOperator getOperator() {
+ //noinspection SwitchStatementWithTooFewBranches
+ switch (getJoinType()) {
+ case COMMA:
+ return COMMA_OPERATOR;
+ default:
+ return OPERATOR;
+ }
+ }
+
+ @Override
+ public SqlKind getKind() {
+ return SqlKind.JOIN;
+ }
+
+ @SuppressWarnings("nullness")
+ @Override
+ public List<SqlNode> getOperandList() {
+ return ImmutableNullableList.of(left, natural, joinType, right,
conditionType, condition);
+ }
+
+ @SuppressWarnings("assignment.type.incompatible")
+ @Override
+ public void setOperand(int i, @Nullable SqlNode operand) {
+ switch (i) {
+ case 0:
+ left = operand;
+ break;
+ case 1:
+ natural = (SqlLiteral) operand;
+ break;
+ case 2:
+ joinType = (SqlLiteral) operand;
+ break;
+ case 3:
+ right = operand;
+ break;
+ case 4:
+ conditionType = (SqlLiteral) operand;
+ break;
+ case 5:
+ condition = operand;
+ break;
+ default:
+ throw new AssertionError(i);
+ }
+ }
+
+ public final @Nullable SqlNode getCondition() {
+ return condition;
+ }
+
+ /** Returns a {@link JoinConditionType}, never null. */
+ public final JoinConditionType getConditionType() {
+ return conditionType.getValueAs(JoinConditionType.class);
+ }
+
+ public SqlLiteral getConditionTypeNode() {
+ return conditionType;
+ }
+
+ /** Returns a {@link JoinType}, never null. */
+ public final JoinType getJoinType() {
+ return joinType.getValueAs(JoinType.class);
+ }
+
+ public SqlLiteral getJoinTypeNode() {
+ return joinType;
+ }
+
+ public final SqlNode getLeft() {
+ return left;
+ }
+
+ public void setLeft(SqlNode left) {
+ this.left = left;
+ }
+
+ public final boolean isNatural() {
+ return natural.booleanValue();
+ }
+
+ public final SqlLiteral isNaturalNode() {
+ return natural;
+ }
+
+ public final SqlNode getRight() {
+ return right;
+ }
+
+ public void setRight(SqlNode right) {
+ this.right = right;
+ }
+
+ /**
+ * Describes the syntax of the SQL {@code JOIN} operator.
+ *
+ * <p>A variant describes the comma operator, which has lower precedence.
+ */
+ public static class SqlJoinOperator extends SqlOperator {
+ private static final SqlWriter.FrameType FRAME_TYPE =
+ SqlWriter.FrameTypeEnum.create("USING");
+
+ // ~ Constructors
-----------------------------------------------------------
+
+ private SqlJoinOperator(String name, int prec) {
+ super(name, SqlKind.JOIN, prec, true, null, null, null);
+ }
+
+ // ~ Methods
----------------------------------------------------------------
+
+ @Override
+ public SqlSyntax getSyntax() {
+ return SqlSyntax.SPECIAL;
+ }
+
+ @SuppressWarnings("argument.type.incompatible")
+ @Override
+ public SqlCall createCall(
+ @Nullable SqlLiteral functionQualifier,
+ SqlParserPos pos,
+ @Nullable SqlNode... operands) {
+ assert functionQualifier == null;
+ return new SqlJoin(
+ pos,
+ operands[0],
+ (SqlLiteral) operands[1],
+ (SqlLiteral) operands[2],
+ operands[3],
+ (SqlLiteral) operands[4],
+ operands[5]);
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, SqlCall call, int leftPrec, int
rightPrec) {
+ final SqlJoin join = (SqlJoin) call;
+
+ join.left.unparse(writer, leftPrec, getLeftPrec());
+ switch (join.getJoinType()) {
+ case COMMA:
+ writer.sep(",", true);
+ break;
+ case CROSS:
+ writer.sep(join.isNatural() ? "NATURAL CROSS JOIN" :
"CROSS JOIN");
+ break;
+ case FULL:
+ writer.sep(join.isNatural() ? "NATURAL FULL JOIN" : "FULL
JOIN");
+ break;
+ case INNER:
+ writer.sep(join.isNatural() ? "NATURAL INNER JOIN" :
"INNER JOIN");
+ break;
+ case LEFT:
+ writer.sep(join.isNatural() ? "NATURAL LEFT JOIN" : "LEFT
JOIN");
+ break;
+ case LEFT_SEMI_JOIN:
+ writer.sep(join.isNatural() ? "NATURAL LEFT SEMI JOIN" :
"LEFT SEMI JOIN");
+ break;
+ case RIGHT:
+ writer.sep(join.isNatural() ? "NATURAL RIGHT JOIN" :
"RIGHT JOIN");
+ break;
+ default:
+ throw Util.unexpected(join.getJoinType());
+ }
+ join.right.unparse(writer, getRightPrec(), rightPrec);
+ SqlNode joinCondition = join.condition;
+ if (joinCondition != null) {
+ switch (join.getConditionType()) {
+ case USING:
+ // No need for an extra pair of parens -- the
condition is a
+ // list. The result is something like "USING (deptno,
gender)".
+ writer.keyword("USING");
+ assert joinCondition instanceof SqlNodeList
+ : "joinCondition should be SqlNodeList, got "
+ joinCondition;
+ final SqlWriter.Frame frame =
writer.startList(FRAME_TYPE, "(", ")");
+ joinCondition.unparse(writer, 0, 0);
+ writer.endList(frame);
+ break;
+
+ case ON:
+ writer.keyword("ON");
+ joinCondition.unparse(writer, leftPrec, rightPrec);
+ break;
+
+ default:
+ throw Util.unexpected(join.getConditionType());
+ }
+ }
+ }
+ }
+
+ @Override
+ public SqlString toSqlString(UnaryOperator<SqlWriterConfig> transform) {
+ SqlNode selectWrapper =
+ new SqlSelect(
+ SqlParserPos.ZERO,
+ SqlNodeList.EMPTY,
+ SqlNodeList.SINGLETON_STAR,
+ this,
+ null,
+ null,
+ null,
+ SqlNodeList.EMPTY,
+ null,
+ null,
+ null,
+ SqlNodeList.EMPTY);
+ return selectWrapper.toSqlString(transform);
+ }
+}
diff --git
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index 7f22d23b779..a880ea76c14 100644
---
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -3266,6 +3266,72 @@ class FlinkSqlParserImplTest extends SqlParserTest {
"CREATE MODEL AS SELECT syntax does
not support to specify explicit output columns."));
}
+ /*
+ * This test was backported from Calcite 1.38 (CALCITE-6266).
+ * Remove it together with upgrade to Calcite 1.38.
+ */
+ @Test
+ void testFromValuesWithoutParens() {
+ sql("select 1 from ^values^('x')")
+ .fails(
+ "(?s)Encountered \"values\" at line 1, column 15\\.\n"
+ + "Was expecting one of:\n"
+ + " \"LATERAL\" \\.\\.\\.\n"
+ + " \"TABLE\" \\.\\.\\.\n"
+ + " <IDENTIFIER> \\.\\.\\.\n"
+ + " <HYPHENATED_IDENTIFIER> \\.\\.\\.\n"
+ + " <QUOTED_IDENTIFIER> \\.\\.\\.\n"
+ + " <BACK_QUOTED_IDENTIFIER> \\.\\.\\.\n"
+ + " <BIG_QUERY_BACK_QUOTED_IDENTIFIER>
\\.\\.\\.\n"
+ + " <BRACKET_QUOTED_IDENTIFIER> \\.\\.\\.\n"
+ + " <UNICODE_QUOTED_IDENTIFIER> \\.\\.\\.\n"
+ + " \"\\(\" \\.\\.\\.\n.*"
+ + " \"UNNEST\" \\.\\.\\.\n.*");
+ }
+
+ /*
+ * This test was backported from Calcite 1.38 (CALCITE-6266).
+ * Remove it together with upgrade to Calcite 1.38.
+ */
+ @Test
+ void testUnnest() {
+ sql("select*from unnest(x)").ok("SELECT *\n" + "FROM UNNEST(`X`)");
+ sql("select*from unnest(x) AS T").ok("SELECT *\n" + "FROM UNNEST(`X`)
AS `T`");
+ // UNNEST cannot be first word in query
+ sql("^unnest^(x)").fails("(?s)Encountered \"unnest\" at.*");
+ // UNNEST with more than one argument
+ final String sql = "select * from dept,\n" + "unnest(dept.employees,
dept.managers)";
+ final String expected =
+ "SELECT *\n" + "FROM `DEPT`,\n" + "UNNEST(`DEPT`.`EMPLOYEES`,
`DEPT`.`MANAGERS`)";
+ sql(sql).ok(expected);
+
+ // LATERAL UNNEST is the same as UNNEST
+ // (LATERAL is implicit for UNNEST, so the parser just ignores it)
+ sql("select * from dept, lateral unnest(dept.employees)")
+ .ok("SELECT *\n" + "FROM `DEPT`,\n" +
"UNNEST(`DEPT`.`EMPLOYEES`)");
+ sql("select * from dept, unnest(dept.employees)")
+ .ok("SELECT *\n" + "FROM `DEPT`,\n" +
"UNNEST(`DEPT`.`EMPLOYEES`)");
+
+ // Does not generate extra parentheses around UNNEST because UNNEST is
+ // a table expression.
+ final String sql1 =
+ ""
+ + "SELECT\n"
+ + " item.name,\n"
+ + " relations.*\n"
+ + "FROM dfs.tmp item\n"
+ + "JOIN (\n"
+ + " SELECT * FROM UNNEST(item.related) i(rels)\n"
+ + ") relations\n"
+ + "ON TRUE";
+ final String expected1 =
+ "SELECT `ITEM`.`NAME`, `RELATIONS`.*\n"
+ + "FROM `DFS`.`TMP` AS `ITEM`\n"
+ + "INNER JOIN (SELECT *\n"
+ + "FROM UNNEST(`ITEM`.`RELATED`) AS `I` (`RELS`)) AS
`RELATIONS` ON TRUE";
+ sql(sql1).ok(expected1);
+ }
+
/** Matcher that invokes the #validate() of the {@link ExtendedSqlNode}
instance. * */
private static class ValidationMatcher extends BaseMatcher<SqlNode> {
private String expectedColumnSql;
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CorrelateITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CorrelateITCase.scala
index f0a56bb36c6..e325e8dbb72 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CorrelateITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CorrelateITCase.scala
@@ -389,6 +389,25 @@ class CorrelateITCase extends StreamingTestBase {
assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
}
+ @Test
+ def testLateralCrossJoin(): Unit = {
+ val data = List((1, 2, "x|y"))
+
+ val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
+ tEnv.createTemporaryView("T1", t1)
+
+ val sql =
+ "SELECT * FROM T1 as t, LATERAL TABLE(STRING_SPLIT(t.c,'|')) CROSS JOIN
(VALUES ('A'), ('B'));"
+
+ val result = tEnv.sqlQuery(sql)
+ val sink = TestSinkUtil.configureSink(result, new TestingAppendTableSink)
+
tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal("MySink",
sink)
+ result.executeInsert("MySink").await()
+
+ val expected = List("1,2,x|y,x,A", "1,2,x|y,x,B", "1,2,x|y,y,A",
"1,2,x|y,y,B")
+ assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
+ }
+
// TODO support agg
// @Test
// def testCountStarOnCorrelate(): Unit = {