[
https://issues.apache.org/jira/browse/FLINK-22356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17328471#comment-17328471
]
Leonard Xu edited comment on FLINK-22356 at 4/22/21, 12:48 PM:
---------------------------------------------------------------
I found two bad cases :( after rethink our solution, 1. the query may not
contain row time field 2. user may use implicit cast eg cast TIMESTAMP_LTZ
field to TIMESTAMP field. I think we need a watermark metadataHandler to infer
the watermark output(time attribute field) type and then expose to sink.
was (Author: leonard xu):
I found two bad cases :( after rethink our proposal, 1. the query may not
contains row time field 2. use may use implicit cast eg cast TIMESTAMP_LTZ
field to TIMESTAMP field. I think we need a watermark metadataHandler to infer
the watermark output(time attribute field) type and then expose to sink.
> Filesystem/Hive partition file is not committed when watermark is applied on
> rowtime of TIMESTAMP_LTZ type
> ----------------------------------------------------------------------------------------------------------
>
> Key: FLINK-22356
> URL: https://issues.apache.org/jira/browse/FLINK-22356
> Project: Flink
> Issue Type: Bug
> Components: Connectors / FileSystem, Connectors / Hive, Table SQL /
> API
> Reporter: Jark Wu
> Assignee: Leonard Xu
> Priority: Critical
> Labels: pull-request-available
> Fix For: 1.13.0, 1.14.0
>
>
> {code:sql}
> set execution.checkpointing.interval = 10s;
> set table.local-time-zone = 'Asia/Shanghai';
> create table mysource (
> ms bigint,
> ts as to_timestamp_ltz(ms, 3),
> watermark for ts as ts - interval '0.001' second
> ) with (
> 'connector' = 'socket',
> 'format' = 'json',
> 'hostname' = '127.0.0.1',
> 'port' = '9999'
> );
> CREATE TABLE fs_table2 (
> ms bigint,
> dt STRING,
> `hour` STRING,
> `mm` string
> ) PARTITIONED BY (dt, `hour`, `mm`) WITH (
> 'connector'='filesystem',
> 'path'='/Users/wuchong/Downloads/fs_table2',
> 'format'='csv',
> 'sink.partition-commit.delay'='1min',
> 'sink.partition-commit.policy.kind'='success-file',
> 'sink.rolling-policy.rollover-interval' = '30s',
> 'sink.rolling-policy.check-interval' = '30s',
> 'sink.partition-commit.trigger'='partition-time',
> 'partition.time-extractor.timestamp-pattern' = '$dt $hour:$mm:00'
> );
> insert into fs_table2
> SELECT ms,
> DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm')
> FROM mysource;
> {code}
> Enther some data in socket:
> {code}
> > nc -lk 9999
> {"ms": 1618839600000}
> {"ms": 1618839600123}
> {"ms": 1618839600456}
> {"ms": 1618839600789}
> {"ms": 1618839660000}
> {"ms": 1618839660123}
> {"ms": 1618839660456}
> {"ms": 1618839660789}
> {"ms": 1618839720000}
> {"ms": 1618839780000}
> {"ms": 1618839840000}
> {"ms": 1618839900000}
> {"ms": 1618839960000}
> {"ms": 1618840020000}
> {code}
> However, all the files are not committed (not {{_SUCCESS}} file):
> {code}
> ➜ hour=21 tree
> .
> ├── mm=40
> │ └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-0
> ├── mm=41
> │ └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-1
> ├── mm=42
> │ └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-2
> ├── mm=43
> │ └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-3
> ├── mm=44
> │ └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-4
> ├── mm=45
> │ └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-5
> ├── mm=46
> │ └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-6
> └── mm=47
> └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-7
> 8 directories, 8 files
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)