[ 
https://issues.apache.org/jira/browse/FLINK-18626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

gaoling ma updated FLINK-18626:
-------------------------------
    Description: 
{code:java}
StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, 
bsSettings);
bsEnv.setParallelism(1);
......
bsTableEnv.executeSql("CREATE TABLE aaa(\n" +
                "    `area_code`        VARCHAR,\n" +
                "    `stat_date`        DATE,\n" +
                "    `index`            BIGINT,\n" +
                "    PRIMARY KEY (area_code, stat_date) NOT ENFORCED" +
                ") WITH (\n" +
                "  'connector'  = 'jdbc',\n" +
                "  'url'        = 'jdbc:mysql://***/laowufp_data_test',\n" +
                "  'table-name' = 'aaa',\n" +
                "  'driver'     = 'com.mysql.cj.jdbc.Driver',\n" +
                "  'username'   = '***',\n" +
                "  'password'   = '***'\n" +
                ")");
                
                bsTableEnv.executeSql("INSERT INTO aaa SELECT area_code, 
CURRENT_DATE AS stat_date, count(*) AS index FROM bbb WHERE is_record = '是' 
GROUP BY area_code");
{code}
When I write the aggregate SQL results into upsert stream JDBC table sink, the 
program automatically exits with no hint. The mysql table is also empty which 
it should not be.
The aggregate results suppose to be a restract stream, but another question is 
how to change the restract stream into upsert stream. Or there is a better way 
to continuous update the aggregate SQL results into JDBC table. Your comment is 
appreciated. 

  was:

{code:java}
StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, 
bsSettings);
bsEnv.setParallelism(1);
......
bsTableEnv.executeSql("CREATE TABLE aaa(\n" +
                "    `area_code`        VARCHAR,\n" +
                "    `stat_date`        DATE,\n" +
                "    `index`            BIGINT,\n" +
                "    PRIMARY KEY (area_code, stat_date) NOT ENFORCED" +
                ") WITH (\n" +
                "  'connector'  = 'jdbc',\n" +
                "  'url'        = 'jdbc:mysql://***/laowufp_data_test',\n" +
                "  'table-name' = 'aaa',\n" +
                "  'driver'     = 'com.mysql.cj.jdbc.Driver',\n" +
                "  'username'   = '***',\n" +
                "  'password'   = '***'\n" +
                ")");
                
                bsTableEnv.executeSql("INSERT INTO aaa SELECT area_code, 
CURRENT_DATE AS stat_date, count(*) AS index FROM bbb WHERE is_record = '是' 
GROUP BY area_code");
{code}
When I write the aggregate SQL results into upsert stream JDBC table sink, the 
program automatically exits with no hint.
The aggregate results suppose to be a restract stream, but another question is 
how to change the restract stream into upsert stream. Or there is a better way 
to continuous update the aggregate SQL results into JDBC table. Your comment is 
appreciated. 


> the result of aggregate SQL on streaming cannot write to upsert table sink 
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-18626
>                 URL: https://issues.apache.org/jira/browse/FLINK-18626
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / JDBC, Table SQL / API
>    Affects Versions: 1.11.0
>            Reporter: gaoling ma
>            Priority: Major
>
> {code:java}
> StreamExecutionEnvironment bsEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, 
> bsSettings);
> bsEnv.setParallelism(1);
> ......
> bsTableEnv.executeSql("CREATE TABLE aaa(\n" +
>               "    `area_code`        VARCHAR,\n" +
>                 "    `stat_date`      DATE,\n" +
>                 "    `index`          BIGINT,\n" +
>                 "    PRIMARY KEY (area_code, stat_date) NOT ENFORCED" +
>                 ") WITH (\n" +
>                 "  'connector'  = 'jdbc',\n" +
>                 "  'url'        = 'jdbc:mysql://***/laowufp_data_test',\n" +
>                 "  'table-name' = 'aaa',\n" +
>                 "  'driver'     = 'com.mysql.cj.jdbc.Driver',\n" +
>                 "  'username'   = '***',\n" +
>                 "  'password'   = '***'\n" +
>                 ")");
>               
>               bsTableEnv.executeSql("INSERT INTO aaa SELECT area_code, 
> CURRENT_DATE AS stat_date, count(*) AS index FROM bbb WHERE is_record = '是' 
> GROUP BY area_code");
> {code}
> When I write the aggregate SQL results into upsert stream JDBC table sink, 
> the program automatically exits with no hint. The mysql table is also empty 
> which it should not be.
> The aggregate results suppose to be a restract stream, but another question 
> is how to change the restract stream into upsert stream. Or there is a better 
> way to continuous update the aggregate SQL results into JDBC table. Your 
> comment is appreciated. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to