[
https://issues.apache.org/jira/browse/FLINK-16938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17073504#comment-17073504
]
YufeiLiu commented on FLINK-16938:
----------------------------------
[~jark] Here is the test demo. I know use {{Instant}} maybe would avoid the
issue, but we have lots of legacy task and udf is defined as Timestamp type.
{code:java}
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(1);
final EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
final StreamTableEnvironment tEnv =
StreamTableEnvironment.create(env, settings);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<Row> source = env.fromElements(
Row.of("first", System.currentTimeMillis()),
Row.of("second", System.currentTimeMillis())
);
long outOfOrderness = 1000;
DataStream<Row> input =
source.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<Row>(Time.milliseconds(outOfOrderness))
{
@Override
public long extractTimestamp(Row element) {
return ((long) element.getField(1));
}
}).returns(Types.ROW_NAMED(new String[]{"name", "ts"},
Types.STRING, Types.LONG));
Table sourceTable = tEnv.fromDataStream(input, "name,
ts.rowtime");
tEnv.createTemporaryView("source_table", sourceTable);
tEnv
.connect(new FileSystem().path("/tmp/sink.csv"))
.withSchema(
new Schema()
.field("name", DataTypes.STRING())
.field("ts", DataTypes.TIMESTAMP(3))
)
.withFormat(new Csv())
.inAppendMode()
.createTemporaryTable("sink_table");
tEnv.insertInto("sink_table", sourceTable);
tEnv.execute("");
{code}
> SqlTimestamp has lag when convert long to Timestamp
> ---------------------------------------------------
>
> Key: FLINK-16938
> URL: https://issues.apache.org/jira/browse/FLINK-16938
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.10.0
> Reporter: YufeiLiu
> Priority: Major
>
> When I set rowtime attribute by using expression 'column.rowtime' , and
> result type is sql.Timestamp, the result will have lag which is equals with
> default timezone offset.
> {code:java}
> tEnv.fromDataStream(stream, "user_action_time.rowtime, user_name, data");
> {code}
> I look into the conversion logic, the field was go through 'long ->
> SqlTimestamp -> Timestamp' conversion.
> {code:java}
> long from = System.currentTimeMillis();
> long to = SqlTimestamp
> .fromEpochMillis(from)
> .toTimestamp()
> .getTime();
> {code}
> The result is {{from!=to}}. In {{SqlTimestamp.toTimestamp()}} using
> {{Timestamp.valueOf(LocalDateTime dateTime)}} which is contain timezone
> infomation, will casue time lag.
> From Timestamp to Timestamp not have this issue, but convert Datastream to
> Table is use StreamRecord.timestamp as rowtime field.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)