swuferhong commented on code in PR #20822:
URL: https://github.com/apache/flink/pull/20822#discussion_r1064731310
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SortTest.scala:
##########
@@ -46,6 +46,134 @@ class SortTest extends TableTestBase {
util.verifyExecPlan("SELECT * FROM MyTable ORDER BY a DESC, b")
}
+ @Test
+ def testSortedResultIntoMultiSinks(): Unit = {
+ val env = util.tableEnv
+ env.executeSql(s"""
+ |CREATE TABLE Source (
+ | a int,
+ | b bigint,
+ | c string,
+ | d string,
+ | e string
+ | ) WITH (
+ | 'connector' = 'values',
+ | 'bounded' = 'true'
+ |)
+ |""".stripMargin)
+ env.executeSql("""
+ |CREATE TABLE sink1 (
+ | a int,
+ | b bigint,
+ | c string
+ |) WITH (
+ | 'connector' = 'filesystem',
+ | 'format' = 'testcsv',
+ | 'path' = '/tmp/test'
+ |)
+ |""".stripMargin)
+ env.executeSql("""
+ |CREATE TABLE sink2 (
+ | a int,
+ | b bigint,
+ | c string,
+ | d string
+ |) WITH (
+ | 'connector' = 'filesystem',
+ | 'format' = 'testcsv',
+ | 'path' = '/tmp/test'
+ |)
+ |""".stripMargin)
+ val stmtSet = env.createStatementSet()
+ stmtSet.addInsertSql("INSERT INTO sink1 select a, b, c from Source order
by d")
+ stmtSet.addInsertSql("INSERT INTO sink2 select a, b, c, d from Source
order by d")
+
+ util.verifyExecPlan(stmtSet)
+ }
+
+ @Test
+ def testSortedViewResultIntoMultiSinks(): Unit = {
Review Comment:
> I think the test is more related to the {@link SubPlanReuseTest}. Could
you move the test?
Done!
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SortTest.scala:
##########
@@ -46,6 +46,134 @@ class SortTest extends TableTestBase {
util.verifyExecPlan("SELECT * FROM MyTable ORDER BY a DESC, b")
}
+ @Test
+ def testSortedResultIntoMultiSinks(): Unit = {
+ val env = util.tableEnv
+ env.executeSql(s"""
+ |CREATE TABLE Source (
+ | a int,
+ | b bigint,
+ | c string,
+ | d string,
+ | e string
+ | ) WITH (
+ | 'connector' = 'values',
+ | 'bounded' = 'true'
+ |)
+ |""".stripMargin)
+ env.executeSql("""
+ |CREATE TABLE sink1 (
+ | a int,
+ | b bigint,
+ | c string
+ |) WITH (
+ | 'connector' = 'filesystem',
+ | 'format' = 'testcsv',
+ | 'path' = '/tmp/test'
+ |)
+ |""".stripMargin)
+ env.executeSql("""
+ |CREATE TABLE sink2 (
+ | a int,
+ | b bigint,
+ | c string,
+ | d string
+ |) WITH (
+ | 'connector' = 'filesystem',
+ | 'format' = 'testcsv',
+ | 'path' = '/tmp/test'
+ |)
+ |""".stripMargin)
+ val stmtSet = env.createStatementSet()
+ stmtSet.addInsertSql("INSERT INTO sink1 select a, b, c from Source order
by d")
+ stmtSet.addInsertSql("INSERT INTO sink2 select a, b, c, d from Source
order by d")
+
+ util.verifyExecPlan(stmtSet)
+ }
+
+ @Test
+ def testSortedViewResultIntoMultiSinks(): Unit = {
+ val env = util.tableEnv
+ env.executeSql(s"""
+ |CREATE TABLE Source (
+ | a int,
+ | b bigint,
+ | c string,
+ | d string,
+ | e string
+ | ) WITH (
+ | 'connector' = 'values',
+ | 'bounded' = 'true'
+ |)
+ |""".stripMargin)
+ val query = "SELECT * FROM Source order by d"
+ val table = env.sqlQuery(query)
+ env.registerTable("SortedTable", table)
+ env.executeSql("""
+ |CREATE TABLE sink1 (
+ | a int,
+ | b bigint,
+ | c string
+ |) WITH (
+ | 'connector' = 'filesystem',
+ | 'format' = 'testcsv',
+ | 'path' = '/tmp/test'
+ |)
+ |""".stripMargin)
+ env.executeSql("""
+ |CREATE TABLE sink2 (
+ | a int,
+ | b bigint,
+ | c string,
+ | d string
+ |) WITH (
+ | 'connector' = 'filesystem',
+ | 'format' = 'testcsv',
+ | 'path' = '/tmp/test'
+ |)
+ |""".stripMargin)
+ val stmtSet = env.createStatementSet()
+ stmtSet.addInsertSql("INSERT INTO sink1 select a, b, e from SortedTable")
+ stmtSet.addInsertSql("INSERT INTO sink2 select a, b, d, e from
SortedTable")
+
+ util.verifyExecPlan(stmtSet)
+ }
+
+ @Test
+ def testSortedViewResultIntoSingleSink(): Unit = {
+ val env = util.tableEnv
+ env.executeSql(s"""
+ |CREATE TABLE Source (
+ | a int,
+ | b bigint,
+ | c string,
+ | d string,
+ | e string
+ | ) WITH (
+ | 'connector' = 'values',
+ | 'bounded' = 'true'
+ |)
+ |""".stripMargin)
+ val query = "SELECT * FROM Source order by d"
+ val table = env.sqlQuery(query)
+ env.registerTable("SortedTable", table)
+ env.executeSql("""
+ |CREATE TABLE sink2 (
+ | a int,
+ | b bigint,
+ | c string,
+ | d string
+ |) WITH (
+ | 'connector' = 'filesystem',
+ | 'format' = 'testcsv',
+ | 'path' = '/tmp/test'
+ |)
+ |""".stripMargin)
+ val stmtSet = env.createStatementSet()
+ stmtSet.addInsertSql("INSERT INTO sink2 select a, b, d, e from
SortedTable")
+ util.verifyExecPlan(stmtSet)
Review Comment:
> This test is not related to the bug. I think we can remove this.
Done!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]