[
https://issues.apache.org/jira/browse/FLINK-21871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17495545#comment-17495545
]
Piotr Nowojski commented on FLINK-21871:
----------------------------------------
Hey [~jark]. I have only now noticed this ticket. Can you elaborate more how
are you planning to send and progress the watermarks? As I understand the
"implementation plan", it sounds like you want to implement this purely in this
particular source? I'm not sure if you are aware, but the watermark progression
for file based sources was being considered as part of
[FLIP-27|https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface#FLIP27:RefactorSourceInterface-OverallDesign],
however this part of the design seems to not be implemented yet.
I believe the intention back then was to have the
SourceCoordinator/SplitEnumerator to emit global watermark (in other words max
allowed watermark) to the source readers, based on the
SourceCoordinator/SplitEnumerator knowledge what splits have been generated and
assigned and what splits will be generated/assigned in the future. For example
KafkaSource was supposed to wait for all assignment of all of the initial
splits, and then simply emit MAX_VALUE as the global watermark, basically
unblocking the already existing per split watermark mechanism. Note that even
that hasn't been implemented so far (FLINK-26018), but that I believe was the
intention of the FLIP-27 design.
For the FileSource I believe the intention was to use the same mechanism. After
generating and assigning all splits up to 2021-03-19 12:00:00,
SourceCoordinator/SplitEnumerator would send "2021-03-19 12:00:00" as the
global/max allowed watermark. Once all splits for the next hour were sent out
2021-03-19 13:00:00, the global watermark would have been bumped again. To make
this work, I see we are also missing one more mechanism. I think at the end of
every processed split, such split should emit "MAX_WATERMARK", and maybe file
splits should emit their's starting watermark.
Anyway, I wanted to make sure that we won't end up implementing some special
handling of this problem just inside the hive file system source, and I wanted
to point out other efforts in this area.
CC [~fpaul] [~twalthr] [~arvid]
> Support watermark for Hive and Filesystem streaming source
> ----------------------------------------------------------
>
> Key: FLINK-21871
> URL: https://issues.apache.org/jira/browse/FLINK-21871
> Project: Flink
> Issue Type: New Feature
> Components: Connectors / FileSystem, Connectors / Hive, Table SQL /
> API
> Reporter: Jark Wu
> Priority: Major
> Labels: auto-unassigned, pull-request-available
> Fix For: 1.15.0
>
>
> Hive and Filesystem already support streaming source. However, they doesn't
> support watermark on the source. That means users can't leverage the
> streaming source to perform the Flink powerful streaming analysis, e.g.
> window aggregate, interval join, and so on.
> In order to make more Hive users can leverage Flink to perform streaming
> analysis, and also cooperate with the new optimized window-TVF operations
> (FLIP-145), we need to support watermark for Hive and Filesystem.
> h2. How to emit watermark in Hive and Filesystem
> Factual data in Hive are usually partitioned by date time, e.g.
> {{pt_day=2021-03-19, pt_hour=10}}. In this case, when the data of partition
> {{pt_day=2021-03-19, pt_hour=10}} are emitted, we should be able to know all
> the data before {{2021-03-19 11:00:00}} have been arrived, so we can emit a
> watermark value of {{2021-03-19 11:00:00}}. We call this partition watermark.
> The partition watermark is much better than record watermark (extract
> watermark from record, e.g. {{ts - INTERVAL '1' MINUTE}}). Because in above
> example, if we are using partition watermark, the window of [10:00, 11:00)
> will be triggered when pt_hour=10 is finished. However, if we are using
> record watermark, the window of [10:00, 11:00) will be triggered when
> pt_hour=11 is arrived, that will make the pipeline have one more partition
> dely.
> Therefore, we firstly focus on support partition watermark for Hive and
> Filesystem.
> h2. Example
> In order to support such watermarks, we propose using the following DDL to
> define a Hive table with watermark defined:
> {code:sql}
> -- using hive dialect
> CREATE TABLE hive_table (
> x int,
> y string,
> z int,
> ts timestamp,
> WATERMARK FOR ts AS SOURCE_WATERMARK
> ) PARTITIONED BY (pt_day string, pt_hour string)
> TBLPROPERTIES (
> 'streaming-source.enable'='true',
> 'streaming-source.monitor-interval'='1s',
> 'partition.time-extractor.timestamp-pattern'='$pt_day $pt_hour:00:00',
> 'partition.time-interval'='1h'
> );
> -- window aggregate on the hive table
> SELECT window_start, window_end, COUNT(*), MAX(y), SUM(z)
> FROM TABLE(
> TUMBLE(TABLE hive_table, DESCRIPTOR(ts), INTERVAL '1' HOUR))
> GROUP BY window_start, window_end;
> {code}
> For filesystem connector, the DDL can be:
> {code:sql}
> CREATE TABLE fs_table (
> x int,
> y string,
> z int,
> ts TIMESTAMP(3),
> pt_day string,
> pt_hour string,
> WATERMARK FOR ts AS SOURCE_WATERMARK
> ) PARTITIONED BY (pt_day, pt_hour)
> WITH (
> 'connector' = 'filesystem',
> 'path' = '/path/to/file',
> 'format' = 'parquet',
> 'streaming-source.enable'='true',
> 'streaming-source.monitor-interval'='1s',
> 'partition.time-extractor.timestamp-pattern'='$pt_day $pt_hour:00:00',
> 'partition.time-interval'='1h'
> );
> {code}
> I will explain the new function/configuration.
> h2. SOURCE_WATERMARK built-in function
> FLIP-66[1] proposed {{SYSTEM_WATERMARK}} function for watermarks preserved in
> underlying source system.
> However, the SYSTEM prefix sounds like a Flink system generated value, but
> actually, this is a SOURCE system generated value.
> So I propose to use {{SOURCE_WATERMARK}} intead, this also keeps the concept
> align with the API of
> {{org.apache.flink.table.descriptors.Rowtime#watermarksFromSource}}.
> h2. Table Options for Watermark
> - {{partition.time-extractor.timestamp-pattern}}: this option already exists.
> This is used to extract/convert partition value to a timestamp value.
> - {{partition.time-interval}}: this is a new option. It indicates the minimal
> time interval of the partitions. It's used to calculate the correct watermark
> when a partition is finished. The watermark = partition-timestamp +
> time-inteval.
> h2. How to support watermark for existing Hive tables
> We all know that we can't create a new table for an existing Hive table. So
> we should support altering existing Hive table to add the watermark
> inforamtion.
> This can be supported by the new ALTER TABLE syntax proposed in FLINK-21634.
> Because watermark, computed column, table options are all encoded in Hive
> table parameters,
> so other systems (e.g. Hive MR, Spark) can still read this Hive table as
> usual.
> {code:sql}
> ALTER TABLE hive_table ADD (
> WATERMARK FOR ts AS SOURCE_WATERMARK
> );
> ALTER TABLE hive_table SET (
> 'streaming-source.enable'='true',
> 'streaming-source.monitor-interval'='1s',
> 'partition.time-extractor.timestamp-pattern'='$pt_day $pt_hour:00:00',
> 'partition.time-interval'='1h'
> );
> {code}
> h2. Implementation Details
> 1. SplitEnumerator: monitors new partitions throught {{PartitionMonitor}},
> sorts partitions by partition name, adds new splits of new partitions to
> {{SplitAssigner}}, and tags the last split of each partition.
> 2. SourceReader: request split to SplitEnumerator, when setup or read out a
> split.
> 3. SplitEnumerator: get split from SplitAssigner, assigned it to the
> requested reader. If the split is the last one of the partition, then
> broadcast a watermark event to all the readers.
> 4. SourceReader receive split: start to read data of the assigned split
> 5. SourceReader recieve watermark: If there is assigned splits, output
> received watermark when splits are read out. If no assigned splits, output
> received watermark right now.
> Note: the SplitAssigner should assign splits in FIFO order.
> The above implementation doesn't require new interface or new method of
> FLIP-27 source. All can be implemented in Hive/Filesystem connector module.
> [1]:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-66%3A+Support+Time+Attribute+in+SQL+DDL
--
This message was sent by Atlassian Jira
(v8.20.1#820001)