[
https://issues.apache.org/jira/browse/FLINK-22356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17330877#comment-17330877
]
Leonard Xu edited comment on FLINK-22356 at 4/23/21, 3:52 PM:
--------------------------------------------------------------
After offline discussion with Timo and Jark, we reach a consensus for he quick
fix.
we have two user case will meet this issue: # user reported in mail list that
they defined a long({{Sytem.currentTimeMillis}}, e.g. 0 ) in DataStream and
used java.sql.Timestamp(e.g {{1970-01-01 08:00:00}} in Shanghai) to define the
row time and then convert to Table, the watermark in table is extended from
DataStream, if the partition time field is from event time field, when
comparing the watermark with the partition time field, this issue happens. I
admit this is a wrong usage but some users used.
2. This issue described, user defined a watermark on TIMESTAMP_LTZ column,
and the partition-time field is aligned with local timestamp, this issue
happens.
we think the simplest way is to add a configuration for hive connector that
used in comparing watermark long value and partition timestamp string.
Thus, we finally proposed a configuration to fix this case.
*`sink.partition-commit.watermark-time-zone`,* it means the time zone to parse
the long watermark value to TIMESTAMP value, the parsed watermark timestamp is
used to compare with partition time to decide the partition should commit or
not. The default value is 'UTC', which means the watermark is defined on
TIMESTAMP column or not defined. If the watermark is defined on TIMESTAMP_LTZ
column, the time zone of watermark is user configured time zone, the the value
should be the user configured local time zone.
was (Author: leonard xu):
After offline discussion with Timo and Jark, we reach a consensus for he quick
fix.
we have two user case will meet this issue: # user reported in mail list that
they defined a long({{Sytem.currentTimeMillis}}, e.g. 0 ) in DataStream and
used java.sql.Timestamp(e.g {{1970-01-01 08:00:00}} in Shanghai) to define the
row time and then convert to Table, the watermark in table is extended from
DataStream, if the partition time field is from event time field, when
comparing the watermark with the partition time field, this issue happens. I
admit this is a wrong usage but some users used.
2. This issue described, user defined a watermark on TIMESTAMP_LTZ column,
and the partition-time field is aligned with local timestamp, this issue
happens.
we think the simplest way is to add a configuration for hive connector that
used in comparing watermark long value and partition timestamp string. This can
cover the two cases, and for another minor case if user’s partition time field
is different with event time(e.g use {{LOCALTIMESTAMP}} to produce), we also
need a configuration value when compare the long watermark value and partition
time field
Thus, I finally proposed a configuration to fix this case.
*`sink.partition-commit.watermark-time-zone`,* it means *t*he time zone to
parse the long watermark value to TIMESTAMP value, the parsed watermark
timestamp is used to compare with partition time to decide the partition should
commit or not. The default value is 'UTC', which means the watermark is defined
on TIMESTAMP column or not defined. If the watermark is defined on
TIMESTAMP_LTZ column, the time zone of watermark is user configured time zone,
the the value should be the user configured local time zone.
> Filesystem/Hive partition file is not committed when watermark is applied on
> rowtime of TIMESTAMP_LTZ type
> ----------------------------------------------------------------------------------------------------------
>
> Key: FLINK-22356
> URL: https://issues.apache.org/jira/browse/FLINK-22356
> Project: Flink
> Issue Type: Bug
> Components: Connectors / FileSystem, Connectors / Hive, Table SQL /
> API
> Reporter: Jark Wu
> Assignee: Leonard Xu
> Priority: Critical
> Labels: pull-request-available
> Fix For: 1.13.0, 1.14.0
>
>
> {code:sql}
> set execution.checkpointing.interval = 10s;
> set table.local-time-zone = 'Asia/Shanghai';
> create table mysource (
> ms bigint,
> ts as to_timestamp_ltz(ms, 3),
> watermark for ts as ts - interval '0.001' second
> ) with (
> 'connector' = 'socket',
> 'format' = 'json',
> 'hostname' = '127.0.0.1',
> 'port' = '9999'
> );
> CREATE TABLE fs_table2 (
> ms bigint,
> dt STRING,
> `hour` STRING,
> `mm` string
> ) PARTITIONED BY (dt, `hour`, `mm`) WITH (
> 'connector'='filesystem',
> 'path'='/Users/wuchong/Downloads/fs_table2',
> 'format'='csv',
> 'sink.partition-commit.delay'='1min',
> 'sink.partition-commit.policy.kind'='success-file',
> 'sink.rolling-policy.rollover-interval' = '30s',
> 'sink.rolling-policy.check-interval' = '30s',
> 'sink.partition-commit.trigger'='partition-time',
> 'partition.time-extractor.timestamp-pattern' = '$dt $hour:$mm:00'
> );
> insert into fs_table2
> SELECT ms,
> DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm')
> FROM mysource;
> {code}
> Enther some data in socket:
> {code}
> > nc -lk 9999
> {"ms": 1618839600000}
> {"ms": 1618839600123}
> {"ms": 1618839600456}
> {"ms": 1618839600789}
> {"ms": 1618839660000}
> {"ms": 1618839660123}
> {"ms": 1618839660456}
> {"ms": 1618839660789}
> {"ms": 1618839720000}
> {"ms": 1618839780000}
> {"ms": 1618839840000}
> {"ms": 1618839900000}
> {"ms": 1618839960000}
> {"ms": 1618840020000}
> {code}
> However, all the files are not committed (not {{_SUCCESS}} file):
> {code}
> ➜ hour=21 tree
> .
> ├── mm=40
> │ └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-0
> ├── mm=41
> │ └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-1
> ├── mm=42
> │ └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-2
> ├── mm=43
> │ └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-3
> ├── mm=44
> │ └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-4
> ├── mm=45
> │ └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-5
> ├── mm=46
> │ └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-6
> └── mm=47
> └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-7
> 8 directories, 8 files
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)