[ 
https://issues.apache.org/jira/browse/FLINK-22356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326626#comment-17326626
 ] 

Timo Walther commented on FLINK-22356:
--------------------------------------

I would like to avoid making {{DynamicTableSink.Context}} more complicated. It 
took a long time to make the interfaces as concise as possible. Can't we fix 
the issue on the Hive side only? The sink factory can simply access the 
resolved schema and check the watermark type and pass it to the sink, no? Btw 
isn't any usage of 
{{org.apache.flink.table.filesystem.DefaultPartTimeExtractor#toMills}} invalid, 
not only for {{PartitionTimeCommitter}}.

> Filesystem/Hive partition file is not committed when watermark is applied on 
> rowtime of TIMESTAMP_LTZ type
> ----------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-22356
>                 URL: https://issues.apache.org/jira/browse/FLINK-22356
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / FileSystem, Connectors / Hive, Table SQL / 
> API
>            Reporter: Jark Wu
>            Assignee: Leonard Xu
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.13.0, 1.14.0
>
>
> {code:sql}
> set execution.checkpointing.interval = 10s;
> set table.local-time-zone = 'Asia/Shanghai';
> create table mysource (
>   ms bigint,
>   ts as to_timestamp_ltz(ms, 3),
>   watermark for ts as ts - interval '0.001' second
> ) with (
>   'connector' = 'socket',
>   'format' = 'json',
>   'hostname' = '127.0.0.1',
>   'port' = '9999'
> );
> CREATE TABLE fs_table2 (
>     ms bigint,
>   dt STRING,
>   `hour` STRING,
>   `mm` string
> ) PARTITIONED BY (dt, `hour`, `mm`) WITH (
>   'connector'='filesystem',
>   'path'='/Users/wuchong/Downloads/fs_table2',
>   'format'='csv',
>   'sink.partition-commit.delay'='1min',
>   'sink.partition-commit.policy.kind'='success-file',
>   'sink.rolling-policy.rollover-interval' = '30s',
>   'sink.rolling-policy.check-interval' = '30s',
>   'sink.partition-commit.trigger'='partition-time',
>   'partition.time-extractor.timestamp-pattern' = '$dt $hour:$mm:00'
> );
> insert into  fs_table2
> SELECT ms,
> DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm')
> FROM mysource;
> {code}
> Enther some data in socket:
> {code}
> > nc -lk 9999
> {"ms": 1618839600000}
> {"ms": 1618839600123}
> {"ms": 1618839600456}
> {"ms": 1618839600789}
> {"ms": 1618839660000}
> {"ms": 1618839660123}
> {"ms": 1618839660456}
> {"ms": 1618839660789}
> {"ms": 1618839720000}
> {"ms": 1618839780000}
> {"ms": 1618839840000}
> {"ms": 1618839900000}
> {"ms": 1618839960000}
> {"ms": 1618840020000}
> {code}
> However, all the files are not committed (not {{_SUCCESS}} file):
> {code}
> ➜  hour=21 tree
> .
> ├── mm=40
> │   └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-0
> ├── mm=41
> │   └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-1
> ├── mm=42
> │   └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-2
> ├── mm=43
> │   └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-3
> ├── mm=44
> │   └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-4
> ├── mm=45
> │   └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-5
> ├── mm=46
> │   └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-6
> └── mm=47
>     └── part-cf06c6da-d301-4623-832c-9e0f356f6fb4-0-7
> 8 directories, 8 files
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to