lsyldliu commented on code in PR #20392:
URL: https://github.com/apache/flink/pull/20392#discussion_r934198540
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala:
##########
@@ -94,4 +94,36 @@ class TableSinkITCase extends BatchTestBase {
tEnv.getConfig.set(CollectSinkOperatorFactory.MAX_BATCH_SIZE,
MemorySize.parse("1kb"))
checkResult("SELECT 1", Seq(row(1)))
}
+
+ @Test
+ def testCreateTableAsSelect(): Unit = {
Review Comment:
Please also test the case when `connector` option is not specified.
Moreover, wether can we add a test about managed table?
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala:
##########
@@ -192,4 +193,30 @@ class TableSinkITCase(mode: StateBackendMode) extends
StreamingWithStateTestBase
)
assertEquals(expected.sorted, result.sorted)
}
+
+ @Test
+ def testCreateTableAsSelect(): Unit = {
Review Comment:
ditto
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala:
##########
@@ -192,4 +193,30 @@ class TableSinkITCase(mode: StateBackendMode) extends
StreamingWithStateTestBase
)
assertEquals(expected.sorted, result.sorted)
}
+
+ @Test
+ def testCreateTableAsSelect(): Unit = {
+ tEnv
+ .executeSql("""
+ |CREATE TABLE MyCtasTable
+ | WITH (
+ | 'connector' = 'values',
+ | 'sink-insert-only' = 'true'
+ |) AS
+ | SELECT
+ | `person`,
+ | `votes`
+ | FROM
+ | src
+ |""".stripMargin)
+ .await()
+ val actual = TestValuesTableFactory.getResults("MyCtasTable")
+ val expected = List(
+ "+I[jason, 1]",
+ "+I[jason, 1]",
+ "+I[jason, 1]",
+ "+I[jason, 1]"
+ )
+ Assertions.assertThat(actual.sorted).isEqualTo(expected.sorted)
Review Comment:
Why not follow other test, using `Assert.assertEquals(expected.sorted,
result1.sorted)` directly?
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java:
##########
@@ -82,6 +87,60 @@ Operation convertCreateTable(SqlCreateTable sqlCreateTable) {
sqlCreateTable.isTemporary());
}
+ /** Convert the {@link SqlCreateTableAs} node. */
+ Operation convertCreateTableAS(FlinkPlannerImpl flinkPlanner,
SqlCreateTableAs sqlCreateTable) {
+ sqlCreateTable.getTableConstraints().forEach(validateTableConstraint);
+
+ UnresolvedIdentifier unresolvedIdentifier =
+ UnresolvedIdentifier.of(sqlCreateTable.fullTableName());
+ ObjectIdentifier identifier =
catalogManager.qualifyIdentifier(unresolvedIdentifier);
+
+ PlannerQueryOperation query =
+ (PlannerQueryOperation)
+ SqlToOperationConverter.convert(
+ flinkPlanner, catalogManager,
sqlCreateTable.getAsQuery())
+ .orElseThrow(
+ () ->
+ new TableException(
+ "CTAS Unsupported node
type "
Review Comment:
```suggestion
"CTAS unsupported
node type "
```
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java:
##########
@@ -82,6 +87,60 @@ Operation convertCreateTable(SqlCreateTable sqlCreateTable) {
sqlCreateTable.isTemporary());
}
+ /** Convert the {@link SqlCreateTableAs} node. */
+ Operation convertCreateTableAS(FlinkPlannerImpl flinkPlanner,
SqlCreateTableAs sqlCreateTable) {
+ sqlCreateTable.getTableConstraints().forEach(validateTableConstraint);
+
+ UnresolvedIdentifier unresolvedIdentifier =
+ UnresolvedIdentifier.of(sqlCreateTable.fullTableName());
+ ObjectIdentifier identifier =
catalogManager.qualifyIdentifier(unresolvedIdentifier);
+
+ PlannerQueryOperation query =
+ (PlannerQueryOperation)
+ SqlToOperationConverter.convert(
+ flinkPlanner, catalogManager,
sqlCreateTable.getAsQuery())
+ .orElseThrow(
+ () ->
+ new TableException(
+ "CTAS Unsupported node
type "
+ +
sqlCreateTable
+
.getAsQuery()
+
.getClass()
+
.getSimpleName()));
+ Map<String, String> properties = new HashMap<>();
Review Comment:
Here we can reuse the `createCatalogTable` method.
```
UnresolvedIdentifier unresolvedIdentifier =
UnresolvedIdentifier.of(sqlCreateTable.fullTableName());
ObjectIdentifier identifier =
catalogManager.qualifyIdentifier(unresolvedIdentifier);
CatalogTable catalogTable = createCatalogTable(sqlCreateTable);
CreateTableOperation createTableOperation =
new CreateTableOperation(
identifier,
CatalogTable.of(Schema.newBuilder().fromResolvedSchema(query.getResolvedSchema()).build(),
catalogTable.getComment(),
catalogTable.getPartitionKeys(),
catalogTable.getOptions()),
sqlCreateTable.isIfNotExists(),
sqlCreateTable.isTemporary());
```
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java:
##########
@@ -82,6 +87,60 @@ Operation convertCreateTable(SqlCreateTable sqlCreateTable) {
sqlCreateTable.isTemporary());
}
+ /** Convert the {@link SqlCreateTableAs} node. */
+ Operation convertCreateTableAS(FlinkPlannerImpl flinkPlanner,
SqlCreateTableAs sqlCreateTable) {
+ sqlCreateTable.getTableConstraints().forEach(validateTableConstraint);
Review Comment:
This validation is no need, we have validated it in
`SqlCreateTableAs#validate` method.
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala:
##########
@@ -94,4 +94,36 @@ class TableSinkITCase extends BatchTestBase {
tEnv.getConfig.set(CollectSinkOperatorFactory.MAX_BATCH_SIZE,
MemorySize.parse("1kb"))
checkResult("SELECT 1", Seq(row(1)))
}
+
Review Comment:
Please also add plan related tests in `TableSinkTest`. And also consider
test the case when `connector` option is not specified. Moreover, wether can we
add a test about managed table?
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala:
##########
@@ -94,4 +94,36 @@ class TableSinkITCase extends BatchTestBase {
tEnv.getConfig.set(CollectSinkOperatorFactory.MAX_BATCH_SIZE,
MemorySize.parse("1kb"))
checkResult("SELECT 1", Seq(row(1)))
}
+
+ @Test
+ def testCreateTableAsSelect(): Unit = {
+ val dataId = TestValuesTableFactory.registerData(smallData3)
+ tEnv.executeSql(s"""
+ |CREATE TABLE MyTable (
+ | `a` INT,
+ | `b` BIGINT,
+ | `c` STRING
+ |) WITH (
+ | 'connector' = 'values',
+ | 'bounded' = 'true',
+ | 'data-id' = '$dataId'
+ |)
+ """.stripMargin)
+
+ val resultPath =
BatchAbstractTestBase.TEMPORARY_FOLDER.newFolder().getAbsolutePath
+ tEnv
+ .executeSql(s"""
+ |CREATE TABLE MyCtasTable
+ | WITH (
+ | 'connector' = 'filesystem',
+ | 'format' = 'testcsv',
+ | 'path' = '$resultPath'
+ |) AS
+ | SELECT * FROM MyTable
+ """.stripMargin)
+ .await()
+ val expected = Seq("1,1,Hi", "2,2,Hello", "3,2,Hello world")
+ val result = TableTestUtil.readFromFile(resultPath)
+ Assertions.assertThat(result.sorted).isEqualTo(expected.sorted)
Review Comment:
Why not follow other test, using `Assert.assertEquals(expected.sorted,
result1.sorted)` directly?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]