This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 729b8b81a77 [FLINK-35935][table-planner] Fix RTAS not supporting LIMIT
729b8b81a77 is described below

commit 729b8b81a77ba6c32711216b88a1bf57ccddfadc
Author: Xuyang <[email protected]>
AuthorDate: Thu Aug 15 09:23:28 2024 +0800

    [FLINK-35935][table-planner] Fix RTAS not supporting LIMIT
    
    This closes #25185
---
 .../converters/SqlReplaceTableAsConverter.java     |  4 +-
 .../SqlRTASNodeToOperationConverterTest.java       | 10 ++++
 .../planner/runtime/batch/sql/RTASITCase.java      | 22 +++++++++
 .../planner/runtime/stream/sql/RTASITCase.java     | 31 +++++++++++-
 .../runtime/batch/sql/TableSinkITCase.scala        | 55 ++++++++++------------
 .../runtime/stream/sql/TableSinkITCase.scala       | 24 ++++++++++
 6 files changed, 114 insertions(+), 32 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java
index 396d50bccb5..83b10eb97d5 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java
@@ -49,10 +49,10 @@ public class SqlReplaceTableAsConverter implements 
SqlNodeConverter<SqlReplaceTa
         ObjectIdentifier identifier = 
catalogManager.qualifyIdentifier(unresolvedIdentifier);
 
         SqlNode asQuerySqlNode = sqlReplaceTableAs.getAsQuery();
-        context.getSqlValidator().validate(asQuerySqlNode);
+        SqlNode validated = context.getSqlValidator().validate(asQuerySqlNode);
         QueryOperation query =
                 new PlannerQueryOperation(
-                        context.toRelRoot(asQuerySqlNode).project(),
+                        context.toRelRoot(validated).project(),
                         () -> context.toQuotedSqlString(asQuerySqlNode));
 
         // get table comment
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
index 8a6fc806cab..cc7b5a3f1af 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
@@ -67,6 +67,16 @@ public class SqlRTASNodeToOperationConverterTest extends 
SqlNodeToOperationConve
         testCommonReplaceTableAs(sql, tableName, null);
     }
 
+    @Test
+    public void testCreateOrReplaceTableASWithLimit() {
+        String tableName = "create_or_replace_table";
+        String sql =
+                "CREATE OR REPLACE TABLE "
+                        + tableName
+                        + " WITH ('k1' = 'v1', 'k2' = 'v2') as (SELECT * FROM 
t1 LIMIT 5)";
+        testCommonReplaceTableAs(sql, tableName, null);
+    }
+
     private void testCommonReplaceTableAs(
             String sql, String tableName, @Nullable String tableComment) {
         ObjectIdentifier expectedIdentifier = ObjectIdentifier.of("builtin", 
"default", tableName);
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/RTASITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/RTASITCase.java
index ec077566ccf..5c02656c5c6 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/RTASITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/RTASITCase.java
@@ -78,6 +78,28 @@ class RTASITCase extends BatchTestBase {
         verifyCatalogTable(expectCatalogTable, getCatalogTable("target"));
     }
 
+    @Test
+    void testReplaceTableASWithSortLimit() throws Exception {
+        tEnv().executeSql(
+                        "REPLACE TABLE target WITH ('connector' = 'values',"
+                                + " 'bounded' = 'true')"
+                                + " AS (SELECT * FROM source order by `a` 
LIMIT 2)")
+                .await();
+
+        // verify written rows
+        
assertThat(TestValuesTableFactory.getResultsAsStrings("target").toString())
+                .isEqualTo("[+I[1, 1, Hi], +I[2, 2, Hello]]");
+
+        // verify the table after replacing
+        CatalogTable expectCatalogTable =
+                getExpectCatalogTable(
+                        new String[] {"a", "b", "c"},
+                        new AbstractDataType[] {
+                            DataTypes.INT(), DataTypes.BIGINT(), 
DataTypes.STRING()
+                        });
+        verifyCatalogTable(expectCatalogTable, getCatalogTable("target"));
+    }
+
     @Test
     void testReplaceTableASWithTableNotExist() {
         assertThatThrownBy(() -> tEnv().executeSql("REPLACE TABLE t AS SELECT 
* FROM source"))
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/RTASITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/RTASITCase.java
index c659134c017..f720dde7eb7 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/RTASITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/RTASITCase.java
@@ -108,6 +108,30 @@ class RTASITCase extends StreamingTestBase {
         verifyCatalogTable(expectCatalogTable, getCatalogTable("target"));
     }
 
+    @Test
+    void testCreateOrReplaceTableASWithSortLimit() throws Exception {
+        tEnv().executeSql(
+                        "CREATE OR REPLACE TABLE target WITH ('connector' = 
'values',"
+                                + " 'sink-insert-only' = 'false')"
+                                + " AS (SELECT a, c FROM source order by `a` 
LIMIT 2)")
+                .await();
+
+        // verify written rows
+        
assertThat(TestValuesTableFactory.getResultsAsStrings("target").toString())
+                .isEqualTo("[+I[1, Hi], +I[2, Hello]]");
+
+        // verify the table after replacing
+        Map<String, String> expectedOptions = new HashMap<>();
+        expectedOptions.put("connector", "values");
+        expectedOptions.put("sink-insert-only", "false");
+        CatalogTable expectCatalogTable =
+                getExpectCatalogTable(
+                        new String[] {"a", "c"},
+                        new AbstractDataType[] {DataTypes.INT(), 
DataTypes.STRING()},
+                        expectedOptions);
+        verifyCatalogTable(expectCatalogTable, getCatalogTable("target"));
+    }
+
     @Test
     void testCreateOrReplaceTableASWithTableNotExist() throws Exception {
         tEnv().executeSql(
@@ -130,11 +154,16 @@ class RTASITCase extends StreamingTestBase {
 
     private CatalogTable getExpectCatalogTable(
             String[] cols, AbstractDataType<?>[] fieldDataTypes) {
+        return getExpectCatalogTable(cols, fieldDataTypes, 
getDefaultTargetTableOptions());
+    }
+
+    private CatalogTable getExpectCatalogTable(
+            String[] cols, AbstractDataType<?>[] fieldDataTypes, Map<String, 
String> tableOptions) {
         return CatalogTable.of(
                 Schema.newBuilder().fromFields(cols, fieldDataTypes).build(),
                 null,
                 Collections.emptyList(),
-                getDefaultTargetTableOptions());
+                tableOptions);
     }
 
     private Map<String, String> getDefaultTargetTableOptions() {
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala
index d0fdc2ad792..a32e7374c06 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala
@@ -28,12 +28,13 @@ import 
org.apache.flink.table.planner.runtime.utils.TestData.smallData3
 import org.apache.flink.table.planner.utils.TableTestUtil
 
 import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.{BeforeEach, Test}
 
 class TableSinkITCase extends BatchTestBase {
 
-  @Test
-  def testTableHints(): Unit = {
+  @BeforeEach
+  override def before(): Unit = {
+    super.before()
     val dataId = TestValuesTableFactory.registerData(smallData3)
     tEnv.executeSql(s"""
                        |CREATE TABLE MyTable (
@@ -46,7 +47,10 @@ class TableSinkITCase extends BatchTestBase {
                        |  'data-id' = '$dataId'
                        |)
        """.stripMargin)
+  }
 
+  @Test
+  def testTableHints(): Unit = {
     val resultPath = createTempFolder().getAbsolutePath
     tEnv.executeSql(s"""
                        |CREATE TABLE MySink (
@@ -89,19 +93,6 @@ class TableSinkITCase extends BatchTestBase {
 
   @Test
   def testCreateTableAsSelect(): Unit = {
-    val dataId = TestValuesTableFactory.registerData(smallData3)
-    tEnv.executeSql(s"""
-                       |CREATE TABLE MyTable (
-                       |  `a` INT,
-                       |  `b` BIGINT,
-                       |  `c` STRING
-                       |) WITH (
-                       |  'connector' = 'values',
-                       |  'bounded' = 'true',
-                       |  'data-id' = '$dataId'
-                       |)
-       """.stripMargin)
-
     val resultPath = createTempFolder().getAbsolutePath
     tEnv
       .executeSql(s"""
@@ -137,21 +128,27 @@ class TableSinkITCase extends BatchTestBase {
   }
 
   @Test
-  def testCreateTableAsSelectWithoutOptions(): Unit = {
-    // TODO CTAS supports ManagedTable
-    val dataId = TestValuesTableFactory.registerData(smallData3)
-    tEnv.executeSql(s"""
-                       |CREATE TABLE MyTable (
-                       |  `a` INT,
-                       |  `b` BIGINT,
-                       |  `c` STRING
-                       |) WITH (
-                       |  'connector' = 'values',
-                       |  'bounded' = 'true',
-                       |  'data-id' = '$dataId'
-                       |)
+  def testCreateTableAsSelectWithSortLimit(): Unit = {
+    val resultPath = createTempFolder().getAbsolutePath
+    tEnv
+      .executeSql(s"""
+                     |CREATE TABLE MyCtasTable
+                     | WITH (
+                     |  'connector' = 'filesystem',
+                     |  'format' = 'testcsv',
+                     |  'path' = '$resultPath'
+                     |) AS
+                     | (SELECT * FROM MyTable order by `a` LIMIT 2)
        """.stripMargin)
+      .await()
+    val expected = Seq("1,1,Hi", "2,2,Hello")
+    val result = TableTestUtil.readFromFile(resultPath)
+    assertThat(result.sorted).isEqualTo(expected.sorted)
+  }
 
+  @Test
+  def testCreateTableAsSelectWithoutOptions(): Unit = {
+    // TODO CTAS supports ManagedTable
     assertThatThrownBy(
       () =>
         tEnv
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
index e4303d71e50..587fda30310 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
@@ -299,6 +299,30 @@ class TableSinkITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase
     assertThat(actualUseStatement.sorted).isEqualTo(expected.sorted)
   }
 
+  @TestTemplate
+  def testCreateTableAsSelectWithSortLimit(): Unit = {
+    tEnv
+      .executeSql("""
+                    |CREATE TABLE MyCtasTable
+                    | WITH (
+                    |   'connector' = 'values',
+                    |   'sink-insert-only' = 'false'
+                    |) AS
+                    |  (SELECT
+                    |    `person`,
+                    |    `votes`
+                    |  FROM
+                    |    src order by `votes` LIMIT 2)
+                    |""".stripMargin)
+      .await()
+    val actual = TestValuesTableFactory.getResultsAsStrings("MyCtasTable")
+    val expected = List(
+      "+I[jason, 1]",
+      "+I[jason, 1]"
+    )
+    assertThat(actual.sorted).isEqualTo(expected.sorted)
+  }
+
   @TestTemplate
   def testCreateTableAsSelectWithoutOptions(): Unit = {
     // TODO: CTAS supports ManagedTable

Reply via email to