This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 11b1fdfc7cf [FLINK-34446][table] Port parser's fix of cross join with 
lateral from Calcite (#24317)
11b1fdfc7cf is described below

commit 11b1fdfc7cffb74c7515a522b923fc2eba223fb8
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Thu Aug 1 17:45:17 2024 +0200

    [FLINK-34446][table] Port parser's fix of cross join with lateral from 
Calcite (#24317)
    
    Backport of Calcite's fix 
https://github.com/apache/calcite/commit/9fa1e16e326e37688b829a2a73064652df40c90d
    thanks to Julian Hyde
---
 .../src/main/codegen/templates/Parser.jj           |  39 ++-
 .../main/java/org/apache/calcite/sql/SqlJoin.java  | 289 +++++++++++++++++++++
 .../flink/sql/parser/FlinkSqlParserImplTest.java   |  66 +++++
 .../runtime/stream/sql/CorrelateITCase.scala       |  19 ++
 4 files changed, 393 insertions(+), 20 deletions(-)

diff --git a/flink-table/flink-sql-parser/src/main/codegen/templates/Parser.jj 
b/flink-table/flink-sql-parser/src/main/codegen/templates/Parser.jj
index cc9345acc84..b299a703457 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/templates/Parser.jj
+++ b/flink-table/flink-sql-parser/src/main/codegen/templates/Parser.jj
@@ -1996,7 +1996,7 @@ SqlNode FromClause() :
     SqlLiteral joinType;
 }
 {
-    e = Join()
+    e = TableRef1(ExprContext.ACCEPT_QUERY_OR_JOIN)
     (
         // Comma joins should only occur at top-level in the FROM clause.
         // Valid:
@@ -2005,33 +2005,30 @@ SqlNode FromClause() :
         // Not valid:
         //  * FROM a CROSS JOIN (b, c)
         LOOKAHEAD(1)
-        <COMMA> { joinType = JoinType.COMMA.symbol(getPos()); }
-        e2 = Join() {
-            e = new SqlJoin(joinType.getParserPosition(),
-                e,
-                SqlLiteral.createBoolean(false, joinType.getParserPosition()),
-                joinType,
-                e2,
-                JoinConditionType.NONE.symbol(SqlParserPos.ZERO),
-                null);
-        }
+        e = JoinOrCommaTable(e)
     )*
     { return e; }
 }
 
-SqlNode Join() :
+SqlNode JoinOrCommaTable(SqlNode e) :
 {
-    SqlNode e;
+    SqlNode e2;
+    SqlLiteral joinType;
 }
 {
-    e = TableRef1(ExprContext.ACCEPT_QUERY_OR_JOIN)
-    (
-        LOOKAHEAD(2)
-        e = JoinTable(e)
-    )*
-    {
-        return e;
+    LOOKAHEAD(2)
+    <COMMA> { joinType = JoinType.COMMA.symbol(getPos()); }
+    e2 = TableRef1(ExprContext.ACCEPT_QUERY_OR_JOIN) {
+        return new SqlJoin(joinType.getParserPosition(),
+            e,
+            SqlLiteral.createBoolean(false, joinType.getParserPosition()),
+            joinType,
+            e2,
+            JoinConditionType.NONE.symbol(SqlParserPos.ZERO),
+            null);
     }
+|
+    e2 = JoinTable(e) { return e2; }
 }
 
 /** Matches "LEFT JOIN t ON ...", "RIGHT JOIN t USING ...", "JOIN t". */
@@ -2179,6 +2176,8 @@ SqlNode TableRef3(ExprContext exprContext, boolean 
lateral) :
         tableRef = addLateral(tableRef, lateral)
         [ tableRef = MatchRecognize(tableRef) ]
     |
+        LOOKAHEAD(2)
+        [ <LATERAL> ] // "LATERAL" is implicit with "UNNEST", so ignore
         <UNNEST> { s = span(); }
         args = ParenthesizedQueryOrCommaList(ExprContext.ACCEPT_SUB_QUERY)
         [
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/calcite/sql/SqlJoin.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/calcite/sql/SqlJoin.java
new file mode 100644
index 00000000000..ca7766b33fb
--- /dev/null
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/calcite/sql/SqlJoin.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.calcite.sql;
+
+import com.google.common.base.Preconditions;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.util.SqlString;
+import org.apache.calcite.util.ImmutableNullableList;
+import org.apache.calcite.util.Util;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import java.util.List;
+import java.util.function.UnaryOperator;
+
+import static java.util.Objects.requireNonNull;
+
+/** Parse tree node representing a {@code JOIN} clause. */
+public class SqlJoin extends SqlCall {
+    static final SqlJoinOperator COMMA_OPERATOR = new 
SqlJoinOperator("COMMA-JOIN", 18);
+    public static final SqlJoinOperator OPERATOR = new SqlJoinOperator("JOIN", 
18);
+
+    SqlNode left;
+
+    /** Operand says whether this is a natural join. Must be constant TRUE or 
FALSE. */
+    SqlLiteral natural;
+
+    /** Value must be a {@link SqlLiteral}, one of the integer codes for 
{@link JoinType}. */
+    SqlLiteral joinType;
+
+    SqlNode right;
+
+    /**
+     * Value must be a {@link SqlLiteral}, one of the integer codes for {@link 
JoinConditionType}.
+     */
+    SqlLiteral conditionType;
+
+    @Nullable SqlNode condition;
+
+    // ~ Constructors 
-----------------------------------------------------------
+
+    public SqlJoin(
+            SqlParserPos pos,
+            SqlNode left,
+            SqlLiteral natural,
+            SqlLiteral joinType,
+            SqlNode right,
+            SqlLiteral conditionType,
+            @Nullable SqlNode condition) {
+        super(pos);
+        this.left = left;
+        this.natural = requireNonNull(natural, "natural");
+        this.joinType = requireNonNull(joinType, "joinType");
+        this.right = right;
+        this.conditionType = requireNonNull(conditionType, "conditionType");
+        this.condition = condition;
+
+        Preconditions.checkArgument(natural.getTypeName() == 
SqlTypeName.BOOLEAN);
+        conditionType.getValueAs(JoinConditionType.class);
+        joinType.getValueAs(JoinType.class);
+    }
+
+    // ~ Methods 
----------------------------------------------------------------
+
+    @Override
+    public SqlOperator getOperator() {
+        //noinspection SwitchStatementWithTooFewBranches
+        switch (getJoinType()) {
+            case COMMA:
+                return COMMA_OPERATOR;
+            default:
+                return OPERATOR;
+        }
+    }
+
+    @Override
+    public SqlKind getKind() {
+        return SqlKind.JOIN;
+    }
+
+    @SuppressWarnings("nullness")
+    @Override
+    public List<SqlNode> getOperandList() {
+        return ImmutableNullableList.of(left, natural, joinType, right, 
conditionType, condition);
+    }
+
+    @SuppressWarnings("assignment.type.incompatible")
+    @Override
+    public void setOperand(int i, @Nullable SqlNode operand) {
+        switch (i) {
+            case 0:
+                left = operand;
+                break;
+            case 1:
+                natural = (SqlLiteral) operand;
+                break;
+            case 2:
+                joinType = (SqlLiteral) operand;
+                break;
+            case 3:
+                right = operand;
+                break;
+            case 4:
+                conditionType = (SqlLiteral) operand;
+                break;
+            case 5:
+                condition = operand;
+                break;
+            default:
+                throw new AssertionError(i);
+        }
+    }
+
+    public final @Nullable SqlNode getCondition() {
+        return condition;
+    }
+
+    /** Returns a {@link JoinConditionType}, never null. */
+    public final JoinConditionType getConditionType() {
+        return conditionType.getValueAs(JoinConditionType.class);
+    }
+
+    public SqlLiteral getConditionTypeNode() {
+        return conditionType;
+    }
+
+    /** Returns a {@link JoinType}, never null. */
+    public final JoinType getJoinType() {
+        return joinType.getValueAs(JoinType.class);
+    }
+
+    public SqlLiteral getJoinTypeNode() {
+        return joinType;
+    }
+
+    public final SqlNode getLeft() {
+        return left;
+    }
+
+    public void setLeft(SqlNode left) {
+        this.left = left;
+    }
+
+    public final boolean isNatural() {
+        return natural.booleanValue();
+    }
+
+    public final SqlLiteral isNaturalNode() {
+        return natural;
+    }
+
+    public final SqlNode getRight() {
+        return right;
+    }
+
+    public void setRight(SqlNode right) {
+        this.right = right;
+    }
+
+    /**
+     * Describes the syntax of the SQL {@code JOIN} operator.
+     *
+     * <p>A variant describes the comma operator, which has lower precedence.
+     */
+    public static class SqlJoinOperator extends SqlOperator {
+        private static final SqlWriter.FrameType FRAME_TYPE =
+                SqlWriter.FrameTypeEnum.create("USING");
+
+        // ~ Constructors 
-----------------------------------------------------------
+
+        private SqlJoinOperator(String name, int prec) {
+            super(name, SqlKind.JOIN, prec, true, null, null, null);
+        }
+
+        // ~ Methods 
----------------------------------------------------------------
+
+        @Override
+        public SqlSyntax getSyntax() {
+            return SqlSyntax.SPECIAL;
+        }
+
+        @SuppressWarnings("argument.type.incompatible")
+        @Override
+        public SqlCall createCall(
+                @Nullable SqlLiteral functionQualifier,
+                SqlParserPos pos,
+                @Nullable SqlNode... operands) {
+            assert functionQualifier == null;
+            return new SqlJoin(
+                    pos,
+                    operands[0],
+                    (SqlLiteral) operands[1],
+                    (SqlLiteral) operands[2],
+                    operands[3],
+                    (SqlLiteral) operands[4],
+                    operands[5]);
+        }
+
+        @Override
+        public void unparse(SqlWriter writer, SqlCall call, int leftPrec, int 
rightPrec) {
+            final SqlJoin join = (SqlJoin) call;
+
+            join.left.unparse(writer, leftPrec, getLeftPrec());
+            switch (join.getJoinType()) {
+                case COMMA:
+                    writer.sep(",", true);
+                    break;
+                case CROSS:
+                    writer.sep(join.isNatural() ? "NATURAL CROSS JOIN" : 
"CROSS JOIN");
+                    break;
+                case FULL:
+                    writer.sep(join.isNatural() ? "NATURAL FULL JOIN" : "FULL 
JOIN");
+                    break;
+                case INNER:
+                    writer.sep(join.isNatural() ? "NATURAL INNER JOIN" : 
"INNER JOIN");
+                    break;
+                case LEFT:
+                    writer.sep(join.isNatural() ? "NATURAL LEFT JOIN" : "LEFT 
JOIN");
+                    break;
+                case LEFT_SEMI_JOIN:
+                    writer.sep(join.isNatural() ? "NATURAL LEFT SEMI JOIN" : 
"LEFT SEMI JOIN");
+                    break;
+                case RIGHT:
+                    writer.sep(join.isNatural() ? "NATURAL RIGHT JOIN" : 
"RIGHT JOIN");
+                    break;
+                default:
+                    throw Util.unexpected(join.getJoinType());
+            }
+            join.right.unparse(writer, getRightPrec(), rightPrec);
+            SqlNode joinCondition = join.condition;
+            if (joinCondition != null) {
+                switch (join.getConditionType()) {
+                    case USING:
+                        // No need for an extra pair of parens -- the 
condition is a
+                        // list. The result is something like "USING (deptno, 
gender)".
+                        writer.keyword("USING");
+                        assert joinCondition instanceof SqlNodeList
+                                : "joinCondition should be SqlNodeList, got " 
+ joinCondition;
+                        final SqlWriter.Frame frame = 
writer.startList(FRAME_TYPE, "(", ")");
+                        joinCondition.unparse(writer, 0, 0);
+                        writer.endList(frame);
+                        break;
+
+                    case ON:
+                        writer.keyword("ON");
+                        joinCondition.unparse(writer, leftPrec, rightPrec);
+                        break;
+
+                    default:
+                        throw Util.unexpected(join.getConditionType());
+                }
+            }
+        }
+    }
+
+    @Override
+    public SqlString toSqlString(UnaryOperator<SqlWriterConfig> transform) {
+        SqlNode selectWrapper =
+                new SqlSelect(
+                        SqlParserPos.ZERO,
+                        SqlNodeList.EMPTY,
+                        SqlNodeList.SINGLETON_STAR,
+                        this,
+                        null,
+                        null,
+                        null,
+                        SqlNodeList.EMPTY,
+                        null,
+                        null,
+                        null,
+                        SqlNodeList.EMPTY);
+        return selectWrapper.toSqlString(transform);
+    }
+}
diff --git 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index 7f22d23b779..a880ea76c14 100644
--- 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -3266,6 +3266,72 @@ class FlinkSqlParserImplTest extends SqlParserTest {
                                         "CREATE MODEL AS SELECT syntax does 
not support to specify explicit output columns."));
     }
 
+    /*
+     * This test was backported from Calcite 1.38 (CALCITE-6266).
+     * Remove it together with upgrade to Calcite 1.38.
+     */
+    @Test
+    void testFromValuesWithoutParens() {
+        sql("select 1 from ^values^('x')")
+                .fails(
+                        "(?s)Encountered \"values\" at line 1, column 15\\.\n"
+                                + "Was expecting one of:\n"
+                                + "    \"LATERAL\" \\.\\.\\.\n"
+                                + "    \"TABLE\" \\.\\.\\.\n"
+                                + "    <IDENTIFIER> \\.\\.\\.\n"
+                                + "    <HYPHENATED_IDENTIFIER> \\.\\.\\.\n"
+                                + "    <QUOTED_IDENTIFIER> \\.\\.\\.\n"
+                                + "    <BACK_QUOTED_IDENTIFIER> \\.\\.\\.\n"
+                                + "    <BIG_QUERY_BACK_QUOTED_IDENTIFIER> 
\\.\\.\\.\n"
+                                + "    <BRACKET_QUOTED_IDENTIFIER> \\.\\.\\.\n"
+                                + "    <UNICODE_QUOTED_IDENTIFIER> \\.\\.\\.\n"
+                                + "    \"\\(\" \\.\\.\\.\n.*"
+                                + "    \"UNNEST\" \\.\\.\\.\n.*");
+    }
+
+    /*
+     * This test was backported from Calcite 1.38 (CALCITE-6266).
+     * Remove it together with upgrade to Calcite 1.38.
+     */
+    @Test
+    void testUnnest() {
+        sql("select*from unnest(x)").ok("SELECT *\n" + "FROM UNNEST(`X`)");
+        sql("select*from unnest(x) AS T").ok("SELECT *\n" + "FROM UNNEST(`X`) 
AS `T`");
+        // UNNEST cannot be first word in query
+        sql("^unnest^(x)").fails("(?s)Encountered \"unnest\" at.*");
+        // UNNEST with more than one argument
+        final String sql = "select * from dept,\n" + "unnest(dept.employees, 
dept.managers)";
+        final String expected =
+                "SELECT *\n" + "FROM `DEPT`,\n" + "UNNEST(`DEPT`.`EMPLOYEES`, 
`DEPT`.`MANAGERS`)";
+        sql(sql).ok(expected);
+
+        // LATERAL UNNEST is the same as UNNEST
+        // (LATERAL is implicit for UNNEST, so the parser just ignores it)
+        sql("select * from dept, lateral unnest(dept.employees)")
+                .ok("SELECT *\n" + "FROM `DEPT`,\n" + 
"UNNEST(`DEPT`.`EMPLOYEES`)");
+        sql("select * from dept, unnest(dept.employees)")
+                .ok("SELECT *\n" + "FROM `DEPT`,\n" + 
"UNNEST(`DEPT`.`EMPLOYEES`)");
+
+        // Does not generate extra parentheses around UNNEST because UNNEST is
+        // a table expression.
+        final String sql1 =
+                ""
+                        + "SELECT\n"
+                        + "  item.name,\n"
+                        + "  relations.*\n"
+                        + "FROM dfs.tmp item\n"
+                        + "JOIN (\n"
+                        + "  SELECT * FROM UNNEST(item.related) i(rels)\n"
+                        + ") relations\n"
+                        + "ON TRUE";
+        final String expected1 =
+                "SELECT `ITEM`.`NAME`, `RELATIONS`.*\n"
+                        + "FROM `DFS`.`TMP` AS `ITEM`\n"
+                        + "INNER JOIN (SELECT *\n"
+                        + "FROM UNNEST(`ITEM`.`RELATED`) AS `I` (`RELS`)) AS 
`RELATIONS` ON TRUE";
+        sql(sql1).ok(expected1);
+    }
+
     /** Matcher that invokes the #validate() of the {@link ExtendedSqlNode} 
instance. * */
     private static class ValidationMatcher extends BaseMatcher<SqlNode> {
         private String expectedColumnSql;
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CorrelateITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CorrelateITCase.scala
index f0a56bb36c6..e325e8dbb72 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CorrelateITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CorrelateITCase.scala
@@ -389,6 +389,25 @@ class CorrelateITCase extends StreamingTestBase {
     assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
   }
 
+  @Test
+  def testLateralCrossJoin(): Unit = {
+    val data = List((1, 2, "x|y"))
+
+    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
+    tEnv.createTemporaryView("T1", t1)
+
+    val sql =
+      "SELECT * FROM T1 as t, LATERAL TABLE(STRING_SPLIT(t.c,'|')) CROSS JOIN 
(VALUES ('A'), ('B'));"
+
+    val result = tEnv.sqlQuery(sql)
+    val sink = TestSinkUtil.configureSink(result, new TestingAppendTableSink)
+    
tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal("MySink", 
sink)
+    result.executeInsert("MySink").await()
+
+    val expected = List("1,2,x|y,x,A", "1,2,x|y,x,B", "1,2,x|y,y,A", 
"1,2,x|y,y,B")
+    assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
+  }
+
   // TODO support agg
 //  @Test
 //  def testCountStarOnCorrelate(): Unit = {

Reply via email to