[ 
https://issues.apache.org/jira/browse/FLINK-18481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159596#comment-17159596
 ] 

gaoling ma edited comment on FLINK-18481 at 7/17/20, 9:34 AM:
--------------------------------------------------------------

                StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
                EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
                StreamTableEnvironment bsTableEnv = 
StreamTableEnvironment.create(bsEnv, bsSettings);
                bsEnv.setParallelism(1);
                                
                bsTableEnv.executeSql("CREATE TABLE aaa (\n" +
                                "    `rowkind`          BOOLEAN,\n" +
                                "    `area_code`        VARCHAR,\n" +
                "    `stat_date`        DATE,\n" +
                "    `index`            BIGINT" +
                ") WITH (\n" +
                "    'connector'                                        = 
'kafka',\n" +
                "    'topic'                                            = 
'aaa',\n" +
                "    'properties.bootstrap.servers' = '...,\n" +
                "    'properties.group.id'                      = 
'testGroup2',\n" +
                "    'scan.startup.mode'                        = 
'latest-offset',\n" +
                "    'format'                                           = 
'csv'\n" +
                ")");
                
                TableResult table = bsTableEnv.executeSql("select * from aaa");
                table.print();

I used Flink version 1.11-shapshot to run the program, the result only has 
table head, no contents. 
+---------+--------------------------------+------------+----------------------+
| rowkind |                      area_code |  stat_date |                index |
+---------+--------------------------------+------------+----------------------+

When I use another way and another consumer group in the same time, I can read 
the topic real data is:
true,520330111205,2020-07-17,1
true,520425103229,2020-07-17,1
true,522628104000,2020-07-17,1
true,522629105212,2020-07-17,1
true,520304105202,2020-07-17,1
true,522727103212,2020-07-17,1
true,522731000000,2020-07-17,1


was (Author: magaoling):
                StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
                EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
                StreamTableEnvironment bsTableEnv = 
StreamTableEnvironment.create(bsEnv, bsSettings);
                bsEnv.setParallelism(1);
                                
                bsTableEnv.executeSql("CREATE TABLE ddb_stat_canal_result_kafka 
(\n" +
                                "    `rowkind`          BOOLEAN,\n" +
                                "    `area_code`        VARCHAR,\n" +
                "    `stat_date`        DATE,\n" +
                "    `index`            BIGINT" +
                ") WITH (\n" +
                "    'connector'                                        = 
'kafka',\n" +
                "    'topic'                                            = 
'aaa',\n" +
                "    'properties.bootstrap.servers' = '...,\n" +
                "    'properties.group.id'                      = 
'testGroup2',\n" +
                "    'scan.startup.mode'                        = 
'latest-offset',\n" +
                "    'format'                                           = 
'csv'\n" +
                ")");
                
                TableResult table = bsTableEnv.executeSql("select * from aaa");
                table.print();

I used Flink version 1.11-shapshot to run the program, the result only has 
table head, no contents. 
+---------+--------------------------------+------------+----------------------+
| rowkind |                      area_code |  stat_date |                index |
+---------+--------------------------------+------------+----------------------+

When I use another way and another consumer group in the same time, I can read 
the topic real data is:
true,520330111205,2020-07-17,1
true,520425103229,2020-07-17,1
true,522628104000,2020-07-17,1
true,522629105212,2020-07-17,1
true,520304105202,2020-07-17,1
true,522727103212,2020-07-17,1
true,522731000000,2020-07-17,1

> Kafka connector can't select data
> ---------------------------------
>
>                 Key: FLINK-18481
>                 URL: https://issues.apache.org/jira/browse/FLINK-18481
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.12.0, 1.11.1
>            Reporter: jack sun
>            Priority: Major
>
> When I use flnk1.11-snapshot or 1.12-snapshot, I use flinksql and Kafka 
> connector, such as
> “EnvironmentSettings fsSettings = EnvironmentSettings.newInstance 
> ().useOldPlanner().inStreamingMode().build();
> StreamExecutionEnvironment fsEnv = Stream 
> ExecutionEnvironment.getExecutionEnvironment ();
> StreamTableEnvironment tableEnv = St reamTableEnvironment.create (fsEnv, 
> fsSettings);
> String createA = "CREATE TABLE MyUserTable (\n" +
> " t1 STRING,\n" +
> " t2 INT\n" +
> ") WITH (\n" +
> " ' connector.type ' = 'kafka', \n" +
> " ' connector.version ' = '0.11',\n" +
> " ' connector.topic ' = 'csvtb', \n" +
> " ' connector.properties.bootstrap .servers' = ' localhost:9092 ', \n" +
> " ' connector.startup -mode' = 'earliest-offset', \n" +
> " ' format.type ' = 'csv'\n" +
> ")\n";
> tableEnv.executeSql (createA);
> TableResult insert = tableEnv.executeSql ("INSERT INTO MyUserTable 
> VALUES('test',2)");
> insert.print ();
> TableResult tableResult = tableEnv.executeSql ("SELECT t1,t2 FROM 
> MyUserTable");
> tableResult.print ();”
> This code can insert data into Kafka, but it can't output the result. Why, 
> thank you



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to