[
https://issues.apache.org/jira/browse/FLINK-18626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17160969#comment-17160969
]
godfrey he commented on FLINK-18626:
------------------------------------
{{executeSql}} method just submits the job and will not wait the job finished,
which behavior is different from {{bsEnv.execute()}}. So in IDE (mini cluster
mode), if the main thread exits, all daemon threads will also exit. Please
check wether your case is in this situation ?
> The result of aggregate SQL on MySQL cannot write to upsert table sink
> inStreamingMode
> ---------------------------------------------------------------------------------------
>
> Key: FLINK-18626
> URL: https://issues.apache.org/jira/browse/FLINK-18626
> Project: Flink
> Issue Type: Bug
> Components: Connectors / JDBC, Table SQL / API
> Affects Versions: 1.11.0
> Reporter: gaoling ma
> Priority: Major
>
> {code:java}
> StreamExecutionEnvironment bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv,
> bsSettings);
> bsEnv.setParallelism(1);
> bsTableEnv.executeSql("CREATE TABLE bbb (\n" +
> ......
> ") WITH (\n" +
> " 'connector' = 'jdbc',\n" +
> " 'url' = 'jdbc:mysql://***/***',\n" +
> " 'table-name' = 'bbb',\n" +
> " 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
> " 'username' = '***',\n" +
> " 'password' = '***'" +
> ")");
> bsTableEnv.executeSql("CREATE TABLE aaa(\n" +
> " `area_code` VARCHAR,\n" +
> " `stat_date` DATE,\n" +
> " `index` BIGINT,\n" +
> " PRIMARY KEY (area_code, stat_date) NOT ENFORCED" +
> ") WITH (\n" +
> " 'connector' = 'jdbc',\n" +
> " 'url' = 'jdbc:mysql://***/***',\n" +
> " 'table-name' = 'aaa',\n" +
> " 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
> " 'username' = '***',\n" +
> " 'password' = '***'\n" +
> ")");
>
> bsTableEnv.executeSql("INSERT INTO aaa SELECT area_code,
> CURRENT_DATE AS stat_date, count(*) AS index FROM bbb WHERE is_record = '是'
> GROUP BY area_code");
> // Table table = bsTableEnv.sqlQuery("SELECT area_code,
> CURRENT_DATE AS stat_date, count(*) AS index FROM bbb WHERE is_record = '是'
> GROUP BY area_code");
> // DataStream<Tuple2<Boolean, Row>> retractStream =
> // bsTableEnv.toRetractStream(table, Row.class);
> // retractStream.print();
> // bsEnv.execute();
> {code}
> When I write the aggregate SQL results of mysql into upsert stream JDBC table
> sink, the program automatically exits with no hint. The result table aaa is
> also empty which it should not be.
> When I use commented code above which converts the aggregate SQL result into
> retractStream, the printed results is normally.
> When I replace data source into kafka and not use commented code above, the
> dynamic results is written into table aaa successfully.
> When I replace inStreamingMode() into inBatchMode() and not use commented
> code above, it also works.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)