wuchong commented on a change in pull request #11490:
[FLINK-15579][table-planner-blink] UpsertStreamTableSink should work on batch
mode
URL: https://github.com/apache/flink/pull/11490#discussion_r396492527
##########
File path:
flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSinkITCase.java
##########
@@ -210,4 +228,76 @@ public void testAppend() throws Exception {
Row.of(20, 6, Timestamp.valueOf("1970-01-01
00:00:00.02"))
}, DB_URL, OUTPUT_TABLE2, new String[]{"id", "num", "ts"});
}
+
+ @Test
+ public void testBatchUpsert() throws Exception {
+ StreamExecutionEnvironment bsEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
+ EnvironmentSettings bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
+ StreamTableEnvironment bsTableEnv =
StreamTableEnvironment.create(bsEnv, bsSettings);
+ RowTypeInfo rt = (RowTypeInfo) Types.ROW_NAMED(new
String[]{"NAME", "SCORE"}, Types.STRING, Types.LONG);
+ Table source = bsTableEnv.fromTableSource(new
CollectionTableSource(generateRecords(2), rt));
+ bsTableEnv.registerTable("sourceTable", source);
+ bsTableEnv.sqlUpdate(
+ "CREATE TABLE USER_RESULT(" +
+ "NAME VARCHAR," +
+ "SCORE BIGINT" +
+ ") WITH ( " +
+ "'connector.type' = 'jdbc'," +
+ "'connector.url'='" + DB_URL + "'," +
+ "'connector.table' = '" + OUTPUT_TABLE3 + "'" +
+ ")");
+
+ bsTableEnv.sqlUpdate("insert into USER_RESULT SELECT s.NAME,
s.SCORE " +
+ "FROM sourceTable as s ");
+ bsTableEnv.execute("test");
+
+ check(new Row[] {
+ Row.of("a0", 0L),
+ Row.of("a1", 1L)
+ }, DB_URL, OUTPUT_TABLE3, new String[]{"NAME", "SCORE"});
+ }
+
+ private List<Row> generateRecords(int numRecords) {
+ int arity = 2;
+ List<Row> res = new ArrayList<>(numRecords);
+ for (long i = 0; i < numRecords; i++) {
+ Row row = new Row(arity);
+ row.setField(0, "a" + i);
+ row.setField(1, i);
+ res.add(row);
+ }
+ return res;
+ }
+
+ private static class CollectionTableSource extends
InputFormatTableSource<Row> {
Review comment:
We should avoid creating too many testing TableSource. You can use VALUES
instead.
```sql
INSERT INTO USER_RESULT
SELECT user_name, score
FROM (VALUES (1, 'Bob'), (22, 'Tom'), (42, 'Kim'), (42, 'Kim'), (42,
'Kim'), (1, 'Bob'))
AS UserCountTable(score, user_name)
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services