[ 
https://issues.apache.org/jira/browse/FLINK-22356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17330877#comment-17330877
 ] 

Leonard Xu edited comment on FLINK-22356 at 4/23/21, 3:52 PM:
--------------------------------------------------------------

After offline discussion with Timo and Jark,  we reach a consensus for he quick 
fix.
 we have two user case will meet this issue: # user reported in mail list that 
they defined a long({{Sytem.currentTimeMillis}}, e.g. 0 ) in DataStream and 
used java.sql.Timestamp(e.g {{1970-01-01 08:00:00}} in Shanghai) to define the 
row time and then convert to Table, the watermark in table is extended from 
DataStream, if the partition time field is from event time field, when 
comparing the watermark with the partition time field, this issue happens. I 
admit this is a wrong usage but some users used.

  2. This issue described, user defined a watermark on TIMESTAMP_LTZ column, 
and the partition-time field is aligned with local timestamp, this issue 
happens.

we think the simplest way is to add a configuration for hive connector that 
used in comparing watermark long value and partition timestamp string. 
  
 Thus, we finally proposed a configuration to fix this case. 
*`sink.partition-commit.watermark-time-zone`,* it means the time zone to parse 
the long watermark value to TIMESTAMP value, the parsed watermark timestamp is 
used to compare with partition time to decide the partition should commit or 
not. The default value is 'UTC', which means the watermark is defined on 
TIMESTAMP column or not defined. If the watermark is defined on TIMESTAMP_LTZ 
column, the time zone of watermark is user configured time zone, the the value 
should be the user configured local time zone.
  


was (Author: leonard xu):
After offline discussion with Timo and Jark,  we reach a consensus for he quick 
fix.
we have two user case will meet this issue: # user reported in mail list that 
they defined a long({{Sytem.currentTimeMillis}}, e.g. 0 ) in DataStream and 
used java.sql.Timestamp(e.g {{1970-01-01 08:00:00}} in Shanghai) to define the 
row time and then convert to Table, the watermark in table is extended from 
DataStream, if the partition time field is from event time field, when 
comparing the watermark with the partition time field, this issue happens. I 
admit this is a wrong usage but some users used.

  2. This issue described, user defined a watermark on TIMESTAMP_LTZ column, 
and the partition-time field is aligned with local timestamp, this issue 
happens.
we think the simplest way is to add a configuration for hive connector that 
used in comparing watermark long value and partition timestamp string. This can 
cover the two cases, and for another minor case if user’s partition time field 
is different with event time(e.g use {{LOCALTIMESTAMP}} to produce), we also 
need a configuration value when compare the long watermark value and partition 
time field
 
Thus, I finally proposed a configuration to fix this case. 
*`sink.partition-commit.watermark-time-zone`,* it means *t*he time zone to 
parse the long watermark value to TIMESTAMP value, the parsed watermark 
timestamp is used to compare with partition time to decide the partition should 
commit or not. The default value is 'UTC', which means the watermark is defined 
on TIMESTAMP column or not defined. If the watermark is defined on 
TIMESTAMP_LTZ column, the time zone of watermark is user configured time zone, 
the the value should be the user configured local time zone.
 

> Filesystem/Hive partition file is not committed when watermark is applied on 
> rowtime of TIMESTAMP_LTZ type
> ----------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-22356
>                 URL: https://issues.apache.org/jira/browse/FLINK-22356
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / FileSystem, Connectors / Hive, Table SQL / 
> API
>            Reporter: Jark Wu
>            Assignee: Leonard Xu
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.13.0, 1.14.0
>
>
> {code:sql}
> set execution.checkpointing.interval = 10s;
> set table.local-time-zone = 'Asia/Shanghai';
> create table mysource (
>   ms bigint,
>   ts as to_timestamp_ltz(ms, 3),
>   watermark for ts as ts - interval '0.001' second
> ) with (
>   'connector' = 'socket',
>   'format' = 'json',
>   'hostname' = '127.0.0.1',
>   'port' = '9999'
> );
> CREATE TABLE fs_table2 (
>     ms bigint,
>   dt STRING,
>   `hour` STRING,
>   `mm` string
> ) PARTITIONED BY (dt, `hour`, `mm`) WITH (
>   'connector'='filesystem',
>   'path'='/Users/wuchong/Downloads/fs_table2',
>   'format'='csv',
>   'sink.partition-commit.delay'='1min',
>   'sink.partition-commit.policy.kind'='success-file',
>   'sink.rolling-policy.rollover-interval' = '30s',
>   'sink.rolling-policy.check-interval' = '30s',
>   'sink.partition-commit.trigger'='partition-time',
>   'partition.time-extractor.timestamp-pattern' = '$dt $hour:$mm:00'
> );
> insert into  fs_table2
> SELECT ms,
> DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm')
> FROM mysource;
> {code}
> Enther some data in socket:
> {code}
> > nc -lk 9999
> {"ms": 1618839600000}
> {"ms": 1618839600123}
> {"ms": 1618839600456}
> {"ms": 1618839600789}
> {"ms": 1618839660000}
> {"ms": 1618839660123}
> {"ms": 1618839660456}
> {"ms": 1618839660789}
> {"ms": 1618839720000}
> {"ms": 1618839780000}
> {"ms": 1618839840000}
> {"ms": 1618839900000}
> {"ms": 1618839960000}
> {"ms": 1618840020000}
> {code}
> However, all the files are not committed (not {{_SUCCESS}} file):
> {code}
> ➜  hour=21 tree
> .
> ├── mm=40
> │   └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-0
> ├── mm=41
> │   └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-1
> ├── mm=42
> │   └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-2
> ├── mm=43
> │   └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-3
> ├── mm=44
> │   └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-4
> ├── mm=45
> │   └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-5
> ├── mm=46
> │   └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-6
> └── mm=47
>     └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-7
> 8 directories, 8 files
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to