[ 
https://issues.apache.org/jira/browse/FLINK-16938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17073504#comment-17073504
 ] 

YufeiLiu commented on FLINK-16938:
----------------------------------

[~jark] Here is the test demo. I know use {{Instant}} maybe would avoid the 
issue, but we have lots of legacy task and udf is defined as Timestamp type.
{code:java}
                final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(1);
                final EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
                final StreamTableEnvironment tEnv = 
StreamTableEnvironment.create(env, settings);
                env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                DataStreamSource<Row> source = env.fromElements(
                        Row.of("first", System.currentTimeMillis()),
                        Row.of("second", System.currentTimeMillis())
                );
                long outOfOrderness = 1000;
                DataStream<Row> input = 
source.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor<Row>(Time.milliseconds(outOfOrderness)) 
{
                        @Override
                        public long extractTimestamp(Row element) {
                                return ((long) element.getField(1));
                        }
                }).returns(Types.ROW_NAMED(new String[]{"name", "ts"}, 
Types.STRING, Types.LONG));
                Table sourceTable = tEnv.fromDataStream(input, "name, 
ts.rowtime");
                tEnv.createTemporaryView("source_table", sourceTable);
                tEnv
                        .connect(new FileSystem().path("/tmp/sink.csv"))
                        .withSchema(
                                new Schema()
                                        .field("name", DataTypes.STRING())
                                        .field("ts", DataTypes.TIMESTAMP(3))
                        )
                        .withFormat(new Csv())
                        .inAppendMode()
                        .createTemporaryTable("sink_table");
                tEnv.insertInto("sink_table", sourceTable);
                tEnv.execute("");
{code}


> SqlTimestamp has lag when convert long to Timestamp
> ---------------------------------------------------
>
>                 Key: FLINK-16938
>                 URL: https://issues.apache.org/jira/browse/FLINK-16938
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.10.0
>            Reporter: YufeiLiu
>            Priority: Major
>
> When I set rowtime attribute by using expression 'column.rowtime' , and 
> result type is sql.Timestamp, the result will have lag which is equals with 
> default timezone offset.
> {code:java}
> tEnv.fromDataStream(stream, "user_action_time.rowtime, user_name, data");
> {code}
> I look into the conversion logic, the field was go through 'long -> 
> SqlTimestamp -> Timestamp' conversion. 
> {code:java}
> long from = System.currentTimeMillis();
> long to = SqlTimestamp
>                       .fromEpochMillis(from)
>                       .toTimestamp()
>                       .getTime();
> {code}
> The result is {{from!=to}}.  In {{SqlTimestamp.toTimestamp()}} using 
> {{Timestamp.valueOf(LocalDateTime dateTime)}} which is contain timezone 
> infomation, will casue time lag.
> From Timestamp to Timestamp not have this issue, but convert Datastream to 
> Table is use StreamRecord.timestamp as rowtime field.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to