BIOINSu commented on code in PR #24128:
URL: https://github.com/apache/flink/pull/24128#discussion_r1462820066


##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSourceITCase.scala:
##########
@@ -421,4 +424,81 @@ class TableSourceITCase extends StreamingTestBase {
     val expected = Seq("1,Sarah,1", "2,Rob,1", "3,Mike,1")
     assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
   }
+
+  private def innerTestSetParallelism(provider: String, parallelism: Int, 
index: Int): Unit = {
+    val dataId = TestValuesTableFactory.registerData(data1)
+    val sourceTableName = 
s"test_para_source_${provider.toLowerCase.trim}_$index"
+    val sinkTableName = s"test_para_sink_${provider.toLowerCase.trim}_$index"
+    tEnv.executeSql(s"""
+                       |CREATE TABLE $sourceTableName (
+                       |  the_month INT,
+                       |  area STRING,
+                       |  product INT
+                       |) WITH (
+                       |  'connector' = 'values',
+                       |  'data-id' = '$dataId',
+                       |  'bounded' = 'true',
+                       |  'runtime-source' = '$provider',
+                       |  'scan.parallelism' = '$parallelism',
+                       |  'enable-projection-push-down' = 'false'
+                       |)
+                       |""".stripMargin)
+    tEnv.executeSql(s"""
+                       |CREATE TABLE $sinkTableName (
+                       |  the_month INT,
+                       |  area STRING,
+                       |  product INT
+                       |) WITH (
+                       |  'connector' = 'values',
+                       |  'sink-insert-only' = 'true'
+                       |)
+                       |""".stripMargin)
+    tEnv.executeSql(s"INSERT INTO $sinkTableName SELECT * FROM 
$sourceTableName").await()
+  }
+
+  @Test
+  def testParallelismWithSourceFunction(): Unit = {

Review Comment:
   I have added some extra test to verify the generated transformation. Please 
review the latest commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to