[ 
https://issues.apache.org/jira/browse/FLINK-18145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17127505#comment-17127505
 ] 

Benchao Li commented on FLINK-18145:
------------------------------------

[~hehuiyuan] If you want to use subgraph optimization, you should use 
`TableEnvironment`, instead of `StreamTableEnvironment`.

> Segment optimization does not work in blink ?
> ---------------------------------------------
>
>                 Key: FLINK-18145
>                 URL: https://issues.apache.org/jira/browse/FLINK-18145
>             Project: Flink
>          Issue Type: Wish
>          Components: Table SQL / Planner
>            Reporter: hehuiyuan
>            Priority: Minor
>         Attachments: image-2020-06-05-14-56-01-710.png, 
> image-2020-06-05-14-56-48-625.png, image-2020-06-05-14-57-11-287.png
>
>
> DAG Segement Optimization: 
>  
> !image-2020-06-05-14-56-01-710.png|width=762,height=264!
> Code:
> {code:java}
>   StreamExecutionEnvironment env = EnvUtil.getEnv();
> env.setParallelism(1);
>   env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
>   EnvironmentSettings bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>   StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(env,bsSettings);
>   GeneratorTableSource tableSource = new GeneratorTableSource(2, 1, 70, 0);
>   tableEnv.registerTableSource("myTble",tableSource);
>   Table mytable = tableEnv.scan("myTble");
>   mytable.printSchema();
>   tableEnv.toAppendStream(mytable,Row.class).addSink(new 
> PrintSinkFunction<>()).setParallelism(2);
>   Table tableproc = tableEnv.sqlQuery("SELECT key, count(rowtime_string) as 
> countkey,TUMBLE_START(proctime, INTERVAL '30' SECOND) as tumblestart FROM 
> myTble group by TUMBLE(proctime, INTERVAL '30' SECOND) ,key");
>   tableproc.printSchema();
>   tableEnv.registerTable("t4",tableproc);
>   Table table = tableEnv.sqlQuery("SELECT key,count(rowtime_string) as 
> countkey,TUMBLE_START(proctime,  INTERVAL '24' HOUR) as tumblestart FROM 
> myTble group by TUMBLE(proctime,  INTERVAL '24' HOUR) ,key");
>   table.printSchema();
>   tableEnv.registerTable("t3",table);
>   String[] fields = new String[]{"key","countkey","tumblestart"};
>  TypeInformation[] fieldsType = new TypeInformation[3];
> fieldsType[0] = Types.INT;
> fieldsType[1] = Types.LONG;
>   fieldsType[2] = Types.SQL_TIMESTAMP;
>   PrintTableUpsertSink printTableSink = new 
> PrintTableUpsertSink(fields,fieldsType,true);
> tableEnv.registerTableSink("inserttable",printTableSink);
> tableEnv.sqlUpdate("insert into inserttable  select key,countkey,tumblestart 
> from t3");
>   String[] fieldsproc = new String[]{"key","countkey","tumblestart"};
>   TypeInformation[] fieldsTypeproc = new TypeInformation[3];
>   fieldsTypeproc[0] = Types.INT;
>   fieldsTypeproc[1] = Types.LONG;
>   fieldsTypeproc[2] = Types.SQL_TIMESTAMP;
>   PrintTableUpsertSink printTableSinkproc = new 
> PrintTableUpsertSink(fieldsproc,fieldsTypeproc,true);
>   tableEnv.registerTableSink("inserttableproc",printTableSinkproc);
>   tableEnv.sqlUpdate("insert into inserttableproc  select 
> key,countkey,tumblestart from t4");
> {code}
> I have a custom  table source , then
>     (1) transform datastream to use `toAppendStream` method   , then  sink
>     (2) use tumble ,then sink
>     (3) use another tumbel ,then sink
> but the segement optimization did't work.
>  
> !image-2020-06-05-14-57-11-287.png|width=546,height=388!  
>  
> *The source is executed by 3 threads  and generate duplicate data for 3 times*
>  
> !image-2020-06-05-14-56-48-625.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to