[
https://issues.apache.org/jira/browse/FLINK-20637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Timo Walther reassigned FLINK-20637:
------------------------------------
Assignee: Timo Walther
> Table convert to dataStream twice will result in two data streams
> -----------------------------------------------------------------
>
> Key: FLINK-20637
> URL: https://issues.apache.org/jira/browse/FLINK-20637
> Project: Flink
> Issue Type: Sub-task
> Components: API / DataStream, Connectors / Kafka, Table SQL / API
> Affects Versions: 1.11.2
> Reporter: Wu
> Assignee: Timo Walther
> Priority: Major
>
>
> Code
> {code:java}
> //代码占位符
> EnvironmentSettings settings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> env.enableCheckpointing(50000);
> env.setParallelism(10);
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
> settings);
>
> tableEnv.executeSql("create table feeds_expose_click_profile ( docId
> string ,buuid string ,predictId string ,docType int ,clickLabel int ,viewTime
> int ,exposeEventTime bigint ,clickEventTime string ,authorId string ,category
> string ,subCategory string ,keywords string ,tags string, eventTime bigint,
> rowTime as TO_TIMESTAMP(from_unixtime(eventTime / 1000)), WATERMARK FOR
> rowTime AS rowTime - INTERVAL '5' SECOND) WITH ('connector' = 'kafka',
> 'topic' = '', 'properties.bootstrap.servers' = '', 'properties.group.id' =
> '', 'scan.startup.mode' = 'latest-offset', 'format' = 'json',
> 'json.ignore-parse-errors' = 'false' )");
> Table table = tableEnv.from("feeds_expose_click_profile");
> TypeInformation<Row> typeInfo = table.getSchema().toRowType();
> DataStream dataStream = tableEnv .toRetractStream(table, typeInfo)
> .filter(row -> row.f0)
> .map(row -> row.f1)
> .returns(typeInfo);
> Table tableFilter = tableEnv.sqlQuery("select buuid, authorId, viewTime,
> rowTime from feeds_expose_click_profile");
> tableEnv.createTemporaryView("tableFilter", tableFilter);
> TypeInformation<Row> typeInfo1 = tableFilter.getSchema().toRowType();
> DataStream dataStream1 = tableEnv .toRetractStream(tableFilter, typeInfo1)
> .filter(row -> row.f0)
> .map(row -> row.f1)
> .returns(typeInfo1); dataStream1.print();
> System.out.println(env.getExecutionPlan());
> {code}
>
>
> ExecutionPlan
>
> {code:java}
> //代码占位符
> {
> "nodes" : [ {
> "id" : 1,
> "type" : "Source: TableSourceScan(table=[[default_catalog,
> default_database, feeds_expose_click_profile]], fields=[docId, buuid,
> predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime,
> authorId, category, subCategory, keywords, tags, eventTime])",
> "pact" : "Data Source",
> "contents" : "Source: TableSourceScan(table=[[default_catalog,
> default_database, feeds_expose_click_profile]], fields=[docId, buuid,
> predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime,
> authorId, category, subCategory, keywords, tags, eventTime])",
> "parallelism" : 10
> }, {
> "id" : 2,
> "type" : "Calc(select=[docId, buuid, predictId, docType, clickLabel,
> viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory,
> keywords, tags, eventTime, TO_TIMESTAMP(FROM_UNIXTIME((eventTime / 1000))) AS
> rowTime])",
> "pact" : "Operator",
> "contents" : "Calc(select=[docId, buuid, predictId, docType, clickLabel,
> viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory,
> keywords, tags, eventTime, TO_TIMESTAMP(FROM_UNIXTIME((eventTime / 1000))) AS
> rowTime])",
> "parallelism" : 10,
> "predecessors" : [ {
> "id" : 1,
> "ship_strategy" : "FORWARD",
> "side" : "second"
> } ]
> }, {
> "id" : 3,
> "type" : "WatermarkAssigner(rowtime=[rowTime], watermark=[(rowTime -
> 5000:INTERVAL SECOND)])",
> "pact" : "Operator",
> "contents" : "WatermarkAssigner(rowtime=[rowTime], watermark=[(rowTime -
> 5000:INTERVAL SECOND)])",
> "parallelism" : 10,
> "predecessors" : [ {
> "id" : 2,
> "ship_strategy" : "FORWARD",
> "side" : "second"
> } ]
> }, {
> "id" : 4,
> "type" : "SinkConversionToTuple2",
> "pact" : "Operator",
> "contents" : "SinkConversionToTuple2",
> "parallelism" : 10,
> "predecessors" : [ {
> "id" : 3,
> "ship_strategy" : "FORWARD",
> "side" : "second"
> } ]
> }, {
> "id" : 5,
> "type" : "Filter",
> "pact" : "Operator",
> "contents" : "Filter",
> "parallelism" : 10,
> "predecessors" : [ {
> "id" : 4,
> "ship_strategy" : "FORWARD",
> "side" : "second"
> } ]
> }, {
> "id" : 6,
> "type" : "Map",
> "pact" : "Operator",
> "contents" : "Map",
> "parallelism" : 10,
> "predecessors" : [ {
> "id" : 5,
> "ship_strategy" : "FORWARD",
> "side" : "second"
> } ]
> }, {
> "id" : 7,
> "type" : "Source: TableSourceScan(table=[[default_catalog,
> default_database, feeds_expose_click_profile]], fields=[docId, buuid,
> predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime,
> authorId, category, subCategory, keywords, tags, eventTime])",
> "pact" : "Data Source",
> "contents" : "Source: TableSourceScan(table=[[default_catalog,
> default_database, feeds_expose_click_profile]], fields=[docId, buuid,
> predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime,
> authorId, category, subCategory, keywords, tags, eventTime])",
> "parallelism" : 10
> }, {
> "id" : 8,
> "type" : "Calc(select=[docId, buuid, predictId, docType, clickLabel,
> viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory,
> keywords, tags, eventTime, TO_TIMESTAMP(FROM_UNIXTIME((eventTime / 1000))) AS
> rowTime])",
> "pact" : "Operator",
> "contents" : "Calc(select=[docId, buuid, predictId, docType, clickLabel,
> viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory,
> keywords, tags, eventTime, TO_TIMESTAMP(FROM_UNIXTIME((eventTime / 1000))) AS
> rowTime])",
> "parallelism" : 10,
> "predecessors" : [ {
> "id" : 7,
> "ship_strategy" : "FORWARD",
> "side" : "second"
> } ]
> }, {
> "id" : 9,
> "type" : "WatermarkAssigner(rowtime=[rowTime], watermark=[(rowTime -
> 5000:INTERVAL SECOND)])",
> "pact" : "Operator",
> "contents" : "WatermarkAssigner(rowtime=[rowTime], watermark=[(rowTime -
> 5000:INTERVAL SECOND)])",
> "parallelism" : 10,
> "predecessors" : [ {
> "id" : 8,
> "ship_strategy" : "FORWARD",
> "side" : "second"
> } ]
> }, {
> "id" : 10,
> "type" : "Calc(select=[buuid, authorId, viewTime, rowTime])",
> "pact" : "Operator",
> "contents" : "Calc(select=[buuid, authorId, viewTime, rowTime])",
> "parallelism" : 10,
> "predecessors" : [ {
> "id" : 9,
> "ship_strategy" : "FORWARD",
> "side" : "second"
> } ]
> }, {
> "id" : 11,
> "type" : "SinkConversionToTuple2",
> "pact" : "Operator",
> "contents" : "SinkConversionToTuple2",
> "parallelism" : 10,
> "predecessors" : [ {
> "id" : 10,
> "ship_strategy" : "FORWARD",
> "side" : "second"
> } ]
> }, {
> "id" : 12,
> "type" : "Filter",
> "pact" : "Operator",
> "contents" : "Filter",
> "parallelism" : 10,
> "predecessors" : [ {
> "id" : 11,
> "ship_strategy" : "FORWARD",
> "side" : "second"
> } ]
> }, {
> "id" : 13,
> "type" : "Map",
> "pact" : "Operator",
> "contents" : "Map",
> "parallelism" : 10,
> "predecessors" : [ {
> "id" : 12,
> "ship_strategy" : "FORWARD",
> "side" : "second"
> } ]
> }, {
> "id" : 14,
> "type" : "Sink: Print to Std. Out",
> "pact" : "Data Sink",
> "contents" : "Sink: Print to Std. Out",
> "parallelism" : 10,
> "predecessors" : [ {
> "id" : 13,
> "ship_strategy" : "FORWARD",
> "side" : "second"
> } ]
> } ]
> }
> {code}
>
> I encountered this problem while using waterdrop. How to fix this problem.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)