Tartarus0zm commented on code in PR #20392:
URL: https://github.com/apache/flink/pull/20392#discussion_r934463064


##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala:
##########
@@ -192,4 +193,30 @@ class TableSinkITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase
     )
     assertEquals(expected.sorted, result.sorted)
   }
+
+  @Test
+  def testCreateTableAsSelect(): Unit = {
+    tEnv
+      .executeSql("""
+                    |CREATE TABLE MyCtasTable
+                    | WITH (
+                    |   'connector' = 'values',
+                    |   'sink-insert-only' = 'true'
+                    |) AS
+                    |  SELECT
+                    |    `person`,
+                    |    `votes`
+                    |  FROM
+                    |    src
+                    |""".stripMargin)
+      .await()
+    val actual = TestValuesTableFactory.getResults("MyCtasTable")
+    val expected = List(
+      "+I[jason, 1]",
+      "+I[jason, 1]",
+      "+I[jason, 1]",
+      "+I[jason, 1]"
+    )
+    Assertions.assertThat(actual.sorted).isEqualTo(expected.sorted)

Review Comment:
   [flink code 
style](https://flink.apache.org/contributing/code-style-and-quality-common.html#avoid-mockito---use-reusable-test-implementations)
   Refer to this coding specification
   



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala:
##########
@@ -94,4 +94,36 @@ class TableSinkITCase extends BatchTestBase {
     tEnv.getConfig.set(CollectSinkOperatorFactory.MAX_BATCH_SIZE, 
MemorySize.parse("1kb"))
     checkResult("SELECT 1", Seq(row(1)))
   }
+
+  @Test
+  def testCreateTableAsSelect(): Unit = {
+    val dataId = TestValuesTableFactory.registerData(smallData3)
+    tEnv.executeSql(s"""
+                       |CREATE TABLE MyTable (
+                       |  `a` INT,
+                       |  `b` BIGINT,
+                       |  `c` STRING
+                       |) WITH (
+                       |  'connector' = 'values',
+                       |  'bounded' = 'true',
+                       |  'data-id' = '$dataId'
+                       |)
+       """.stripMargin)
+
+    val resultPath = 
BatchAbstractTestBase.TEMPORARY_FOLDER.newFolder().getAbsolutePath
+    tEnv
+      .executeSql(s"""
+                     |CREATE TABLE MyCtasTable
+                     | WITH (
+                     |  'connector' = 'filesystem',
+                     |  'format' = 'testcsv',
+                     |  'path' = '$resultPath'
+                     |) AS
+                     | SELECT * FROM MyTable
+       """.stripMargin)
+      .await()
+    val expected = Seq("1,1,Hi", "2,2,Hello", "3,2,Hello world")
+    val result = TableTestUtil.readFromFile(resultPath)
+    Assertions.assertThat(result.sorted).isEqualTo(expected.sorted)

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to