[
https://issues.apache.org/jira/browse/FLINK-21871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17495832#comment-17495832
]
Jark Wu commented on FLINK-21871:
---------------------------------
Hi [~pnowojski], yes, I planned to implement it in the Hive source. But it
would be great if the SourceCoordinator/SplitEnumerator has mechanism to emit
max allowed watermark according to the finished splits. That would make the
implementation of Hive source watermark much simpler.
> Support watermark for Hive and Filesystem streaming source
> ----------------------------------------------------------
>
> Key: FLINK-21871
> URL: https://issues.apache.org/jira/browse/FLINK-21871
> Project: Flink
> Issue Type: New Feature
> Components: Connectors / FileSystem, Connectors / Hive, Table SQL /
> API
> Reporter: Jark Wu
> Priority: Major
> Labels: auto-unassigned, pull-request-available
> Fix For: 1.15.0
>
>
> Hive and Filesystem already support streaming source. However, they doesn't
> support watermark on the source. That means users can't leverage the
> streaming source to perform the Flink powerful streaming analysis, e.g.
> window aggregate, interval join, and so on.
> In order to make more Hive users can leverage Flink to perform streaming
> analysis, and also cooperate with the new optimized window-TVF operations
> (FLIP-145), we need to support watermark for Hive and Filesystem.
> h2. How to emit watermark in Hive and Filesystem
> Factual data in Hive are usually partitioned by date time, e.g.
> {{pt_day=2021-03-19, pt_hour=10}}. In this case, when the data of partition
> {{pt_day=2021-03-19, pt_hour=10}} are emitted, we should be able to know all
> the data before {{2021-03-19 11:00:00}} have been arrived, so we can emit a
> watermark value of {{2021-03-19 11:00:00}}. We call this partition watermark.
> The partition watermark is much better than record watermark (extract
> watermark from record, e.g. {{ts - INTERVAL '1' MINUTE}}). Because in above
> example, if we are using partition watermark, the window of [10:00, 11:00)
> will be triggered when pt_hour=10 is finished. However, if we are using
> record watermark, the window of [10:00, 11:00) will be triggered when
> pt_hour=11 is arrived, that will make the pipeline have one more partition
> dely.
> Therefore, we firstly focus on support partition watermark for Hive and
> Filesystem.
> h2. Example
> In order to support such watermarks, we propose using the following DDL to
> define a Hive table with watermark defined:
> {code:sql}
> -- using hive dialect
> CREATE TABLE hive_table (
> x int,
> y string,
> z int,
> ts timestamp,
> WATERMARK FOR ts AS SOURCE_WATERMARK
> ) PARTITIONED BY (pt_day string, pt_hour string)
> TBLPROPERTIES (
> 'streaming-source.enable'='true',
> 'streaming-source.monitor-interval'='1s',
> 'partition.time-extractor.timestamp-pattern'='$pt_day $pt_hour:00:00',
> 'partition.time-interval'='1h'
> );
> -- window aggregate on the hive table
> SELECT window_start, window_end, COUNT(*), MAX(y), SUM(z)
> FROM TABLE(
> TUMBLE(TABLE hive_table, DESCRIPTOR(ts), INTERVAL '1' HOUR))
> GROUP BY window_start, window_end;
> {code}
> For filesystem connector, the DDL can be:
> {code:sql}
> CREATE TABLE fs_table (
> x int,
> y string,
> z int,
> ts TIMESTAMP(3),
> pt_day string,
> pt_hour string,
> WATERMARK FOR ts AS SOURCE_WATERMARK
> ) PARTITIONED BY (pt_day, pt_hour)
> WITH (
> 'connector' = 'filesystem',
> 'path' = '/path/to/file',
> 'format' = 'parquet',
> 'streaming-source.enable'='true',
> 'streaming-source.monitor-interval'='1s',
> 'partition.time-extractor.timestamp-pattern'='$pt_day $pt_hour:00:00',
> 'partition.time-interval'='1h'
> );
> {code}
> I will explain the new function/configuration.
> h2. SOURCE_WATERMARK built-in function
> FLIP-66[1] proposed {{SYSTEM_WATERMARK}} function for watermarks preserved in
> underlying source system.
> However, the SYSTEM prefix sounds like a Flink system generated value, but
> actually, this is a SOURCE system generated value.
> So I propose to use {{SOURCE_WATERMARK}} intead, this also keeps the concept
> align with the API of
> {{org.apache.flink.table.descriptors.Rowtime#watermarksFromSource}}.
> h2. Table Options for Watermark
> - {{partition.time-extractor.timestamp-pattern}}: this option already exists.
> This is used to extract/convert partition value to a timestamp value.
> - {{partition.time-interval}}: this is a new option. It indicates the minimal
> time interval of the partitions. It's used to calculate the correct watermark
> when a partition is finished. The watermark = partition-timestamp +
> time-inteval.
> h2. How to support watermark for existing Hive tables
> We all know that we can't create a new table for an existing Hive table. So
> we should support altering existing Hive table to add the watermark
> inforamtion.
> This can be supported by the new ALTER TABLE syntax proposed in FLINK-21634.
> Because watermark, computed column, table options are all encoded in Hive
> table parameters,
> so other systems (e.g. Hive MR, Spark) can still read this Hive table as
> usual.
> {code:sql}
> ALTER TABLE hive_table ADD (
> WATERMARK FOR ts AS SOURCE_WATERMARK
> );
> ALTER TABLE hive_table SET (
> 'streaming-source.enable'='true',
> 'streaming-source.monitor-interval'='1s',
> 'partition.time-extractor.timestamp-pattern'='$pt_day $pt_hour:00:00',
> 'partition.time-interval'='1h'
> );
> {code}
> h2. Implementation Details
> 1. SplitEnumerator: monitors new partitions throught {{PartitionMonitor}},
> sorts partitions by partition name, adds new splits of new partitions to
> {{SplitAssigner}}, and tags the last split of each partition.
> 2. SourceReader: request split to SplitEnumerator, when setup or read out a
> split.
> 3. SplitEnumerator: get split from SplitAssigner, assigned it to the
> requested reader. If the split is the last one of the partition, then
> broadcast a watermark event to all the readers.
> 4. SourceReader receive split: start to read data of the assigned split
> 5. SourceReader recieve watermark: If there is assigned splits, output
> received watermark when splits are read out. If no assigned splits, output
> received watermark right now.
> Note: the SplitAssigner should assign splits in FIFO order.
> The above implementation doesn't require new interface or new method of
> FLIP-27 source. All can be implemented in Hive/Filesystem connector module.
> [1]:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-66%3A+Support+Time+Attribute+in+SQL+DDL
--
This message was sent by Atlassian Jira
(v8.20.1#820001)