[
https://issues.apache.org/jira/browse/FLINK-18724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Aljoscha Krettek updated FLINK-18724:
-------------------------------------
Component/s: (was: API / Core)
> Integration with DataStream and DataSet API report error
> ---------------------------------------------------------
>
> Key: FLINK-18724
> URL: https://issues.apache.org/jira/browse/FLINK-18724
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka, Table SQL / API
> Affects Versions: 1.11.1
> Reporter: liang ding
> Priority: Major
>
> I want to create a table from a DataStream(kafka) : there is two reason I
> need to use DataStream:
> # I need decode msg to columns by custom format, in sql mode I don't known
> how to do it.
> # I have realize DeserializationSchema or FlatMapFunction both. when use
> datastream I can do many things before it become a suitable table, that is my
> prefer way in any other apply.
> so I do it like that:
> {code:java}
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings tSet=
> EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
> StreamTableEnvironment tEnv=StreamTableEnvironment.create(env,tSet);
> DataStream<MyRow> stream = env
> .addSource(new FlinkKafkaConsumer<>("test-log", new
> SimpleStringSchema(), properties))
> .flatMap(new LogParser());
> //stream.printToErr();
> tEnv.fromDataStream(stream).select("userId,city").execute().print();
> tEnv.execute("test-sql");
> //env.execute("test");
> {code}
> then I got message:
> {noformat}
> [Kafka Fetcher for Source: Custom Source -> Flat Map ->* -> select:
> (userId,city) -> to: Row (3/3)] INFO
> org.apache.kafka.clients.FetchSessionHandler - [Consumer
> clientId=consumer-flink-3-5, groupId=flink-3] Node 0 sent an invalid full
> fetch response with extra=(test-log-0, response=(
> [Kafka Fetcher for Source: Custom Source -> Flat Map ->* -> select:
> (userId,city) -> to: Row (3/3)] INFO
> org.apache.kafka.clients.FetchSessionHandler - [Consumer
> clientId=consumer-flink-3-5, groupId=flink-3] Node 0 sent an invalid full
> fetch response with extra=(test-log-1, response=({noformat}
> it seen like both StreamExecutionEnvironment and StreamTableEnvironment start
> the fetcher and make no one successed.
> and there is no guide Integration which made me confused: should I do
> env.execute or
> tableEnv.execute or both(it's seen not) ? and the blink planner way
--
This message was sent by Atlassian Jira
(v8.3.4#803005)