[
https://issues.apache.org/jira/browse/FLINK-18626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
gaoling ma updated FLINK-18626:
-------------------------------
Description:
{code:java}
StreamExecutionEnvironment bsEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv,
bsSettings);
bsEnv.setParallelism(1);
......
bsTableEnv.executeSql("CREATE TABLE aaa(\n" +
" `area_code` VARCHAR,\n" +
" `stat_date` DATE,\n" +
" `index` BIGINT,\n" +
" PRIMARY KEY (area_code, stat_date) NOT ENFORCED" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://***/laowufp_data_test',\n" +
" 'table-name' = 'aaa',\n" +
" 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
" 'username' = '***',\n" +
" 'password' = '***'\n" +
")");
bsTableEnv.executeSql("INSERT INTO aaa SELECT area_code,
CURRENT_DATE AS stat_date, count(*) AS index FROM bbb WHERE is_record = '是'
GROUP BY area_code");
{code}
When I write the aggregate SQL results into upsert stream JDBC table sink, the
program automatically exits with no hint. The mysql table is also empty which
it should not be.
The aggregate results suppose to be a restract stream, but another question is
how to change the restract stream into upsert stream. Or there is a better way
to continuous update the aggregate SQL results into JDBC table. Your comment is
appreciated.
was:
{code:java}
StreamExecutionEnvironment bsEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv,
bsSettings);
bsEnv.setParallelism(1);
......
bsTableEnv.executeSql("CREATE TABLE aaa(\n" +
" `area_code` VARCHAR,\n" +
" `stat_date` DATE,\n" +
" `index` BIGINT,\n" +
" PRIMARY KEY (area_code, stat_date) NOT ENFORCED" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://***/laowufp_data_test',\n" +
" 'table-name' = 'aaa',\n" +
" 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
" 'username' = '***',\n" +
" 'password' = '***'\n" +
")");
bsTableEnv.executeSql("INSERT INTO aaa SELECT area_code,
CURRENT_DATE AS stat_date, count(*) AS index FROM bbb WHERE is_record = '是'
GROUP BY area_code");
{code}
When I write the aggregate SQL results into upsert stream JDBC table sink, the
program automatically exits with no hint.
The aggregate results suppose to be a restract stream, but another question is
how to change the restract stream into upsert stream. Or there is a better way
to continuous update the aggregate SQL results into JDBC table. Your comment is
appreciated.
> the result of aggregate SQL on streaming cannot write to upsert table sink
> ---------------------------------------------------------------------------
>
> Key: FLINK-18626
> URL: https://issues.apache.org/jira/browse/FLINK-18626
> Project: Flink
> Issue Type: Bug
> Components: Connectors / JDBC, Table SQL / API
> Affects Versions: 1.11.0
> Reporter: gaoling ma
> Priority: Major
>
> {code:java}
> StreamExecutionEnvironment bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv,
> bsSettings);
> bsEnv.setParallelism(1);
> ......
> bsTableEnv.executeSql("CREATE TABLE aaa(\n" +
> " `area_code` VARCHAR,\n" +
> " `stat_date` DATE,\n" +
> " `index` BIGINT,\n" +
> " PRIMARY KEY (area_code, stat_date) NOT ENFORCED" +
> ") WITH (\n" +
> " 'connector' = 'jdbc',\n" +
> " 'url' = 'jdbc:mysql://***/laowufp_data_test',\n" +
> " 'table-name' = 'aaa',\n" +
> " 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
> " 'username' = '***',\n" +
> " 'password' = '***'\n" +
> ")");
>
> bsTableEnv.executeSql("INSERT INTO aaa SELECT area_code,
> CURRENT_DATE AS stat_date, count(*) AS index FROM bbb WHERE is_record = '是'
> GROUP BY area_code");
> {code}
> When I write the aggregate SQL results into upsert stream JDBC table sink,
> the program automatically exits with no hint. The mysql table is also empty
> which it should not be.
> The aggregate results suppose to be a restract stream, but another question
> is how to change the restract stream into upsert stream. Or there is a better
> way to continuous update the aggregate SQL results into JDBC table. Your
> comment is appreciated.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)