[ 
https://issues.apache.org/jira/browse/FLINK-18626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17161642#comment-17161642
 ] 

gaoling ma commented on FLINK-18626:
------------------------------------

When I added System.in.read() after the executeSql method to block the main 
thread, and the result turns to be right. Thanks for your comment.

> The result of aggregate SQL on MySQL cannot write to upsert table sink 
> inStreamingMode 
> ---------------------------------------------------------------------------------------
>
>                 Key: FLINK-18626
>                 URL: https://issues.apache.org/jira/browse/FLINK-18626
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / JDBC, Table SQL / API
>    Affects Versions: 1.11.0
>            Reporter: gaoling ma
>            Priority: Major
>
> {code:java}
> StreamExecutionEnvironment bsEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, 
> bsSettings);
> bsEnv.setParallelism(1);
> bsTableEnv.executeSql("CREATE TABLE bbb (\n" +
>                               ......
>                               ") WITH (\n" +
>                 "  'connector'  = 'jdbc',\n" +
>                 "  'url'        = 'jdbc:mysql://***/***',\n" +
>                 "  'table-name' = 'bbb',\n" +
>                 "  'driver'     = 'com.mysql.cj.jdbc.Driver',\n" +
>                 "  'username'   = '***',\n" +
>                 "  'password'   = '***'" +
>                               ")");
> bsTableEnv.executeSql("CREATE TABLE aaa(\n" +
>               "    `area_code`        VARCHAR,\n" +
>                 "    `stat_date`              DATE,\n" +
>                 "    `index`          BIGINT,\n" +
>                 "    PRIMARY KEY (area_code, stat_date) NOT ENFORCED" +
>                 ") WITH (\n" +
>                 "  'connector'  = 'jdbc',\n" +
>                 "  'url'        = 'jdbc:mysql://***/***',\n" +
>                 "  'table-name' = 'aaa',\n" +
>                 "  'driver'     = 'com.mysql.cj.jdbc.Driver',\n" +
>                 "  'username'   = '***',\n" +
>                 "  'password'   = '***'\n" +
>                 ")");
>               
>               bsTableEnv.executeSql("INSERT INTO aaa SELECT area_code, 
> CURRENT_DATE AS stat_date, count(*) AS index FROM bbb WHERE is_record = '是' 
> GROUP BY area_code");
> //            Table table = bsTableEnv.sqlQuery("SELECT area_code, 
> CURRENT_DATE AS stat_date, count(*) AS index FROM bbb WHERE is_record = '是' 
> GROUP BY area_code");
> //            DataStream<Tuple2<Boolean, Row>> retractStream = 
> //                            bsTableEnv.toRetractStream(table, Row.class);
> //            retractStream.print();
> //            bsEnv.execute();
> {code}
> When I write the aggregate SQL results of mysql into upsert stream JDBC table 
> sink, the program automatically exits with no hint. The result table aaa is 
> also empty which it should not be.
> When I use commented code above which converts the aggregate SQL result into 
> retractStream, the printed results is normally.
> When I replace data source into kafka and not use commented code above, the 
> dynamic results is written into table aaa successfully.
> When I replace inStreamingMode() into inBatchMode() and not use commented 
> code above, it also works.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to