wuchong commented on a change in pull request #15340:
URL: https://github.com/apache/flink/pull/15340#discussion_r599484931
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
##########
@@ -447,6 +447,50 @@ class TableEnvironmentITCase(tableEnvName: String,
isStreaming: Boolean) extends
assertFirstValues(sinkPath)
}
+ @Test
+ def testTableDMLSync(): Unit = {
+ tEnv.getConfig.getConfiguration.set(TableConfigOptions.TABLE_DML_SYNC,
Boolean.box(true));
+ val sink1Path = _tempFolder.newFolder().toString
+ tEnv.executeSql(
+ s"""
+ |create table MySink1 (
+ | first string
+ |) with (
+ | 'connector' = 'filesystem',
+ | 'path' = '$sink1Path',
+ | 'format' = 'testcsv'
+ |)
+ """.stripMargin
+ )
+
+ val sink2Path = _tempFolder.newFolder().toString
+ tEnv.executeSql(
+ s"""
+ |create table MySink2 (
+ | first string
+ |) with (
+ | 'connector' = 'filesystem',
+ | 'path' = '$sink2Path',
+ | 'format' = 'testcsv'
+ |)
+ """.stripMargin
+ )
+
+ val tableResult1 =
+ tEnv.sqlQuery("select first from MyTable").executeInsert("MySink1",
false)
+ checkInsertTableResult(tableResult1,
"default_catalog.default_database.MySink1")
+ assertFirstValues(sink1Path)
+
+ val tableResult2 =
+ tEnv.sqlQuery("select first from MySink1").executeInsert("MySink2",
false)
+ checkInsertTableResult(tableResult2,
"default_catalog.default_database.MySink2")
+ assertFirstValues(sink2Path)
+
+ // invoke await again
+ tableResult1.await()
+ tableResult2.await()
Review comment:
why need this?
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
##########
@@ -447,6 +447,50 @@ class TableEnvironmentITCase(tableEnvName: String,
isStreaming: Boolean) extends
assertFirstValues(sinkPath)
}
+ @Test
+ def testTableDMLSync(): Unit = {
+ tEnv.getConfig.getConfiguration.set(TableConfigOptions.TABLE_DML_SYNC,
Boolean.box(true));
+ val sink1Path = _tempFolder.newFolder().toString
+ tEnv.executeSql(
+ s"""
+ |create table MySink1 (
+ | first string
+ |) with (
+ | 'connector' = 'filesystem',
+ | 'path' = '$sink1Path',
+ | 'format' = 'testcsv'
+ |)
+ """.stripMargin
+ )
+
+ val sink2Path = _tempFolder.newFolder().toString
+ tEnv.executeSql(
+ s"""
+ |create table MySink2 (
+ | first string
+ |) with (
+ | 'connector' = 'filesystem',
+ | 'path' = '$sink2Path',
+ | 'format' = 'testcsv'
+ |)
+ """.stripMargin
+ )
+
+ val tableResult1 =
+ tEnv.sqlQuery("select first from MyTable").executeInsert("MySink1",
false)
+ checkInsertTableResult(tableResult1,
"default_catalog.default_database.MySink1")
+ assertFirstValues(sink1Path)
+
+ val tableResult2 =
+ tEnv.sqlQuery("select first from MySink1").executeInsert("MySink2",
false)
Review comment:
Would be better to use and add different DML methods, e.g. StatementSet,
INSERT INTO.
##########
File path: flink-table/flink-sql-client/src/test/resources/sql/select.q
##########
@@ -50,8 +50,38 @@ java.lang.IllegalArgumentException: testing elements of
values source shouldn't
# (we can't test changelog mode and table mode in IT case)
# ==========================================================================
+SET table.dml-sync=true;
+[INFO] Session property has been set.
+!info
+
+create table MyTable (
+ id int,
+ str string
+) with (
+ 'connector' = 'filesystem',
+ 'path' = '$VAR_PATH',
+ 'format' = 'csv'
+);
+[INFO] Table has been created.
+!info
+
+SELECT id, COUNT(*) as cnt, COUNT(DISTINCT str) as uv
+FROM MyTable
+GROUP BY id;
++----+-------------+----------------------+----------------------+
+| op | id | cnt | uv |
++----+-------------+----------------------+----------------------+
+Received a total of 0 row
+!ok
+
+INSERT INTO MyTable SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), (2,
'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
+[INFO] Submitting SQL update statement to the cluster...
+[INFO] Table update statement has been successfully submitted to the cluster:
+
Review comment:
Would be better to print some information about waiting result. What
will Hive print when executing INSERT INTO?
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
##########
@@ -447,6 +447,50 @@ class TableEnvironmentITCase(tableEnvName: String,
isStreaming: Boolean) extends
assertFirstValues(sinkPath)
}
+ @Test
+ def testTableDMLSync(): Unit = {
+ tEnv.getConfig.getConfiguration.set(TableConfigOptions.TABLE_DML_SYNC,
Boolean.box(true));
+ val sink1Path = _tempFolder.newFolder().toString
+ tEnv.executeSql(
+ s"""
+ |create table MySink1 (
+ | first string
+ |) with (
+ | 'connector' = 'filesystem',
+ | 'path' = '$sink1Path',
+ | 'format' = 'testcsv'
+ |)
+ """.stripMargin
+ )
+
+ val sink2Path = _tempFolder.newFolder().toString
+ tEnv.executeSql(
+ s"""
+ |create table MySink2 (
+ | first string
+ |) with (
+ | 'connector' = 'filesystem',
+ | 'path' = '$sink2Path',
+ | 'format' = 'testcsv'
+ |)
+ """.stripMargin
+ )
+
+ val tableResult1 =
+ tEnv.sqlQuery("select first from MyTable").executeInsert("MySink1",
false)
+ checkInsertTableResult(tableResult1,
"default_catalog.default_database.MySink1")
+ assertFirstValues(sink1Path)
+
+ val tableResult2 =
+ tEnv.sqlQuery("select first from MySink1").executeInsert("MySink2",
false)
+ checkInsertTableResult(tableResult2,
"default_catalog.default_database.MySink2")
Review comment:
`checkInsertTableResult` will wait until job is finished, we have to
assert file data first and then check the `TableResult`.
##########
File path: flink-table/flink-sql-client/src/test/resources/sql/select.q
##########
@@ -50,8 +50,38 @@ java.lang.IllegalArgumentException: testing elements of
values source shouldn't
# (we can't test changelog mode and table mode in IT case)
# ==========================================================================
+SET table.dml-sync=true;
+[INFO] Session property has been set.
+!info
+
+create table MyTable (
+ id int,
+ str string
+) with (
+ 'connector' = 'filesystem',
+ 'path' = '$VAR_PATH',
+ 'format' = 'csv'
+);
+[INFO] Table has been created.
+!info
+
+SELECT id, COUNT(*) as cnt, COUNT(DISTINCT str) as uv
+FROM MyTable
+GROUP BY id;
++----+-------------+----------------------+----------------------+
+| op | id | cnt | uv |
++----+-------------+----------------------+----------------------+
+Received a total of 0 row
+!ok
+
+INSERT INTO MyTable SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), (2,
'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
Review comment:
I suggest to move the insert tests into another file, e.g. `insert.q`.
##########
File path:
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
##########
@@ -245,7 +247,10 @@ private static String getInputFromPath(String sqlPath)
throws IOException {
// remove the promote prefix
line = line.substring(PROMOTE.length());
}
- contentLines.add(line);
+ // ignore the line begin with Job ID:
+ if (!line.startsWith(JOB_ID)) {
+ contentLines.add(line);
Review comment:
I suggest to just remove the random job id, but still keep this line.
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java
##########
@@ -48,6 +48,16 @@ private TableConfigOptions() {}
+ "Note: The old planner will be removed
in Flink 1.14, "
+ "so this option will become obsolete.");
+ @Documentation.TableOption(execMode =
Documentation.ExecMode.BATCH_STREAMING)
+ public static final ConfigOption<Boolean> TABLE_DML_SYNC =
+ key("table.dml-sync")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Determine the mode to execute dml. It will
execute the next statement "
+ + "when submit the current job in default.
If set true, "
+ + "it will execute the next statement when
the current job finishes.");
Review comment:
What do you think about:
> Specifies if the DML job (i.e. the insert operation) is executed
asynchronously or synchronously. By default, the execution is async, so you can
submit multiple DML jobs at the same time. If set this option to true, the
insert operation will wait for the job to finish.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]