[ 
https://issues.apache.org/jira/browse/FLINK-21013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17280752#comment-17280752
 ] 

Yuan Mei edited comment on FLINK-21013 at 2/8/21, 5:18 AM:
-----------------------------------------------------------

Hey [~Leonard Xu] and [~jark], what is the status of this ticket? 

I saw the PRs are ready to review, how far away these PRs to be merged?

I am asking because this ticket is marked as a blocker for Flink 1.12.2 
release, wondering is it still the case?


was (Author: ym):
Hey [~Leonard Xu] and [~jark], what is the status of this ticket? 

I saw the PRs are ready to review, how far away these PRs to be merged?

> Blink planner does not ingest timestamp into StreamRecord
> ---------------------------------------------------------
>
>                 Key: FLINK-21013
>                 URL: https://issues.apache.org/jira/browse/FLINK-21013
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner, Table SQL / Runtime
>    Affects Versions: 1.12.0
>            Reporter: Timo Walther
>            Assignee: Leonard Xu
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.12.2
>
>
> Currently, the rowtime attribute is not put into the StreamRecord when 
> leaving the Table API to DataStream API. The legacy planner supports this, 
> but the timestamp is null when using the Blink planner.
> {code}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setParallelism(1);
>             EnvironmentSettings settings =
>                     
> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
>             StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
> settings);
>         DataStream<Order> orderA =
>                 env.fromCollection(
>                         Arrays.asList(
>                                 new Order(1L, "beer", 3),
>                                 new Order(1L, "diaper", 4),
>                                 new Order(3L, "rubber", 2)));
>         DataStream<Order> orderB =
>                 orderA.assignTimestampsAndWatermarks(
>                         new AssignerWithPunctuatedWatermarks<Order>() {
>                             @Nullable
>                             @Override
>                             public Watermark checkAndGetNextWatermark(
>                                     Order lastElement, long 
> extractedTimestamp) {
>                                 return new Watermark(extractedTimestamp);
>                             }
>                             @Override
>                             public long extractTimestamp(Order element, long 
> recordTimestamp) {
>                                 return element.user;
>                             }
>                         });
>         Table tableA = tEnv.fromDataStream(orderB, $("user").rowtime(), 
> $("product"), $("amount"));
>         // union the two tables
>         Table result = tEnv.sqlQuery("SELECT * FROM " + tableA);
>         tEnv.toAppendStream(result, Row.class)
>                 .process(
>                         new ProcessFunction<Row, Row>() {
>                             @Override
>                             public void processElement(Row value, Context 
> ctx, Collector<Row> out)
>                                     throws Exception {
>                                 System.out.println("TIMESTAMP" + 
> ctx.timestamp());
>                             }
>                         })
>                 .print();
>         env.execute();
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to