[
https://issues.apache.org/jira/browse/FLINK-21013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17280752#comment-17280752
]
Yuan Mei commented on FLINK-21013:
----------------------------------
Hey [~Leonard Xu] and [~jark], what is the status of this ticket?
> Blink planner does not ingest timestamp into StreamRecord
> ---------------------------------------------------------
>
> Key: FLINK-21013
> URL: https://issues.apache.org/jira/browse/FLINK-21013
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner, Table SQL / Runtime
> Affects Versions: 1.12.0
> Reporter: Timo Walther
> Assignee: Leonard Xu
> Priority: Blocker
> Labels: pull-request-available
> Fix For: 1.12.2
>
>
> Currently, the rowtime attribute is not put into the StreamRecord when
> leaving the Table API to DataStream API. The legacy planner supports this,
> but the timestamp is null when using the Blink planner.
> {code}
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> EnvironmentSettings settings =
>
> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
> settings);
> DataStream<Order> orderA =
> env.fromCollection(
> Arrays.asList(
> new Order(1L, "beer", 3),
> new Order(1L, "diaper", 4),
> new Order(3L, "rubber", 2)));
> DataStream<Order> orderB =
> orderA.assignTimestampsAndWatermarks(
> new AssignerWithPunctuatedWatermarks<Order>() {
> @Nullable
> @Override
> public Watermark checkAndGetNextWatermark(
> Order lastElement, long
> extractedTimestamp) {
> return new Watermark(extractedTimestamp);
> }
> @Override
> public long extractTimestamp(Order element, long
> recordTimestamp) {
> return element.user;
> }
> });
> Table tableA = tEnv.fromDataStream(orderB, $("user").rowtime(),
> $("product"), $("amount"));
> // union the two tables
> Table result = tEnv.sqlQuery("SELECT * FROM " + tableA);
> tEnv.toAppendStream(result, Row.class)
> .process(
> new ProcessFunction<Row, Row>() {
> @Override
> public void processElement(Row value, Context
> ctx, Collector<Row> out)
> throws Exception {
> System.out.println("TIMESTAMP" +
> ctx.timestamp());
> }
> })
> .print();
> env.execute();
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)