This is an automated email from the ASF dual-hosted git repository.
lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 729b8b81a77 [FLINK-35935][table-planner] Fix RTAS not supporting LIMIT
729b8b81a77 is described below
commit 729b8b81a77ba6c32711216b88a1bf57ccddfadc
Author: Xuyang <[email protected]>
AuthorDate: Thu Aug 15 09:23:28 2024 +0800
[FLINK-35935][table-planner] Fix RTAS not supporting LIMIT
This closes #25185
---
.../converters/SqlReplaceTableAsConverter.java | 4 +-
.../SqlRTASNodeToOperationConverterTest.java | 10 ++++
.../planner/runtime/batch/sql/RTASITCase.java | 22 +++++++++
.../planner/runtime/stream/sql/RTASITCase.java | 31 +++++++++++-
.../runtime/batch/sql/TableSinkITCase.scala | 55 ++++++++++------------
.../runtime/stream/sql/TableSinkITCase.scala | 24 ++++++++++
6 files changed, 114 insertions(+), 32 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java
index 396d50bccb5..83b10eb97d5 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java
@@ -49,10 +49,10 @@ public class SqlReplaceTableAsConverter implements
SqlNodeConverter<SqlReplaceTa
ObjectIdentifier identifier =
catalogManager.qualifyIdentifier(unresolvedIdentifier);
SqlNode asQuerySqlNode = sqlReplaceTableAs.getAsQuery();
- context.getSqlValidator().validate(asQuerySqlNode);
+ SqlNode validated = context.getSqlValidator().validate(asQuerySqlNode);
QueryOperation query =
new PlannerQueryOperation(
- context.toRelRoot(asQuerySqlNode).project(),
+ context.toRelRoot(validated).project(),
() -> context.toQuotedSqlString(asQuerySqlNode));
// get table comment
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
index 8a6fc806cab..cc7b5a3f1af 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
@@ -67,6 +67,16 @@ public class SqlRTASNodeToOperationConverterTest extends
SqlNodeToOperationConve
testCommonReplaceTableAs(sql, tableName, null);
}
+ @Test
+ public void testCreateOrReplaceTableASWithLimit() {
+ String tableName = "create_or_replace_table";
+ String sql =
+ "CREATE OR REPLACE TABLE "
+ + tableName
+ + " WITH ('k1' = 'v1', 'k2' = 'v2') as (SELECT * FROM
t1 LIMIT 5)";
+ testCommonReplaceTableAs(sql, tableName, null);
+ }
+
private void testCommonReplaceTableAs(
String sql, String tableName, @Nullable String tableComment) {
ObjectIdentifier expectedIdentifier = ObjectIdentifier.of("builtin",
"default", tableName);
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/RTASITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/RTASITCase.java
index ec077566ccf..5c02656c5c6 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/RTASITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/RTASITCase.java
@@ -78,6 +78,28 @@ class RTASITCase extends BatchTestBase {
verifyCatalogTable(expectCatalogTable, getCatalogTable("target"));
}
+ @Test
+ void testReplaceTableASWithSortLimit() throws Exception {
+ tEnv().executeSql(
+ "REPLACE TABLE target WITH ('connector' = 'values',"
+ + " 'bounded' = 'true')"
+ + " AS (SELECT * FROM source order by `a`
LIMIT 2)")
+ .await();
+
+ // verify written rows
+
assertThat(TestValuesTableFactory.getResultsAsStrings("target").toString())
+ .isEqualTo("[+I[1, 1, Hi], +I[2, 2, Hello]]");
+
+ // verify the table after replacing
+ CatalogTable expectCatalogTable =
+ getExpectCatalogTable(
+ new String[] {"a", "b", "c"},
+ new AbstractDataType[] {
+ DataTypes.INT(), DataTypes.BIGINT(),
DataTypes.STRING()
+ });
+ verifyCatalogTable(expectCatalogTable, getCatalogTable("target"));
+ }
+
@Test
void testReplaceTableASWithTableNotExist() {
assertThatThrownBy(() -> tEnv().executeSql("REPLACE TABLE t AS SELECT
* FROM source"))
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/RTASITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/RTASITCase.java
index c659134c017..f720dde7eb7 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/RTASITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/RTASITCase.java
@@ -108,6 +108,30 @@ class RTASITCase extends StreamingTestBase {
verifyCatalogTable(expectCatalogTable, getCatalogTable("target"));
}
+ @Test
+ void testCreateOrReplaceTableASWithSortLimit() throws Exception {
+ tEnv().executeSql(
+ "CREATE OR REPLACE TABLE target WITH ('connector' =
'values',"
+ + " 'sink-insert-only' = 'false')"
+ + " AS (SELECT a, c FROM source order by `a`
LIMIT 2)")
+ .await();
+
+ // verify written rows
+
assertThat(TestValuesTableFactory.getResultsAsStrings("target").toString())
+ .isEqualTo("[+I[1, Hi], +I[2, Hello]]");
+
+ // verify the table after replacing
+ Map<String, String> expectedOptions = new HashMap<>();
+ expectedOptions.put("connector", "values");
+ expectedOptions.put("sink-insert-only", "false");
+ CatalogTable expectCatalogTable =
+ getExpectCatalogTable(
+ new String[] {"a", "c"},
+ new AbstractDataType[] {DataTypes.INT(),
DataTypes.STRING()},
+ expectedOptions);
+ verifyCatalogTable(expectCatalogTable, getCatalogTable("target"));
+ }
+
@Test
void testCreateOrReplaceTableASWithTableNotExist() throws Exception {
tEnv().executeSql(
@@ -130,11 +154,16 @@ class RTASITCase extends StreamingTestBase {
private CatalogTable getExpectCatalogTable(
String[] cols, AbstractDataType<?>[] fieldDataTypes) {
+ return getExpectCatalogTable(cols, fieldDataTypes,
getDefaultTargetTableOptions());
+ }
+
+ private CatalogTable getExpectCatalogTable(
+ String[] cols, AbstractDataType<?>[] fieldDataTypes, Map<String,
String> tableOptions) {
return CatalogTable.of(
Schema.newBuilder().fromFields(cols, fieldDataTypes).build(),
null,
Collections.emptyList(),
- getDefaultTargetTableOptions());
+ tableOptions);
}
private Map<String, String> getDefaultTargetTableOptions() {
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala
index d0fdc2ad792..a32e7374c06 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala
@@ -28,12 +28,13 @@ import
org.apache.flink.table.planner.runtime.utils.TestData.smallData3
import org.apache.flink.table.planner.utils.TableTestUtil
import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.{BeforeEach, Test}
class TableSinkITCase extends BatchTestBase {
- @Test
- def testTableHints(): Unit = {
+ @BeforeEach
+ override def before(): Unit = {
+ super.before()
val dataId = TestValuesTableFactory.registerData(smallData3)
tEnv.executeSql(s"""
|CREATE TABLE MyTable (
@@ -46,7 +47,10 @@ class TableSinkITCase extends BatchTestBase {
| 'data-id' = '$dataId'
|)
""".stripMargin)
+ }
+ @Test
+ def testTableHints(): Unit = {
val resultPath = createTempFolder().getAbsolutePath
tEnv.executeSql(s"""
|CREATE TABLE MySink (
@@ -89,19 +93,6 @@ class TableSinkITCase extends BatchTestBase {
@Test
def testCreateTableAsSelect(): Unit = {
- val dataId = TestValuesTableFactory.registerData(smallData3)
- tEnv.executeSql(s"""
- |CREATE TABLE MyTable (
- | `a` INT,
- | `b` BIGINT,
- | `c` STRING
- |) WITH (
- | 'connector' = 'values',
- | 'bounded' = 'true',
- | 'data-id' = '$dataId'
- |)
- """.stripMargin)
-
val resultPath = createTempFolder().getAbsolutePath
tEnv
.executeSql(s"""
@@ -137,21 +128,27 @@ class TableSinkITCase extends BatchTestBase {
}
@Test
- def testCreateTableAsSelectWithoutOptions(): Unit = {
- // TODO CTAS supports ManagedTable
- val dataId = TestValuesTableFactory.registerData(smallData3)
- tEnv.executeSql(s"""
- |CREATE TABLE MyTable (
- | `a` INT,
- | `b` BIGINT,
- | `c` STRING
- |) WITH (
- | 'connector' = 'values',
- | 'bounded' = 'true',
- | 'data-id' = '$dataId'
- |)
+ def testCreateTableAsSelectWithSortLimit(): Unit = {
+ val resultPath = createTempFolder().getAbsolutePath
+ tEnv
+ .executeSql(s"""
+ |CREATE TABLE MyCtasTable
+ | WITH (
+ | 'connector' = 'filesystem',
+ | 'format' = 'testcsv',
+ | 'path' = '$resultPath'
+ |) AS
+ | (SELECT * FROM MyTable order by `a` LIMIT 2)
""".stripMargin)
+ .await()
+ val expected = Seq("1,1,Hi", "2,2,Hello")
+ val result = TableTestUtil.readFromFile(resultPath)
+ assertThat(result.sorted).isEqualTo(expected.sorted)
+ }
+ @Test
+ def testCreateTableAsSelectWithoutOptions(): Unit = {
+ // TODO CTAS supports ManagedTable
assertThatThrownBy(
() =>
tEnv
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
index e4303d71e50..587fda30310 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
@@ -299,6 +299,30 @@ class TableSinkITCase(mode: StateBackendMode) extends
StreamingWithStateTestBase
assertThat(actualUseStatement.sorted).isEqualTo(expected.sorted)
}
+ @TestTemplate
+ def testCreateTableAsSelectWithSortLimit(): Unit = {
+ tEnv
+ .executeSql("""
+ |CREATE TABLE MyCtasTable
+ | WITH (
+ | 'connector' = 'values',
+ | 'sink-insert-only' = 'false'
+ |) AS
+ | (SELECT
+ | `person`,
+ | `votes`
+ | FROM
+ | src order by `votes` LIMIT 2)
+ |""".stripMargin)
+ .await()
+ val actual = TestValuesTableFactory.getResultsAsStrings("MyCtasTable")
+ val expected = List(
+ "+I[jason, 1]",
+ "+I[jason, 1]"
+ )
+ assertThat(actual.sorted).isEqualTo(expected.sorted)
+ }
+
@TestTemplate
def testCreateTableAsSelectWithoutOptions(): Unit = {
// TODO: CTAS supports ManagedTable