This is an automated email from the ASF dual-hosted git repository.

stevenwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 8519224de3 Flink: Document watermark generation feature (#9179)
8519224de3 is described below

commit 8519224de33b26cfd7c539ffe6f123ea66165711
Author: pvary <[email protected]>
AuthorDate: Tue Dec 5 17:48:10 2023 +0100

    Flink: Document watermark generation feature (#9179)
---
 docs/flink-queries.md | 69 +++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 69 insertions(+)

diff --git a/docs/flink-queries.md b/docs/flink-queries.md
index 4cef5468cd..cf68fa367c 100644
--- a/docs/flink-queries.md
+++ b/docs/flink-queries.md
@@ -277,6 +277,75 @@ DataStream<Row> stream = env.fromSource(source, 
WatermarkStrategy.noWatermarks()
     "Iceberg Source as Avro GenericRecord", new 
GenericRecordAvroTypeInfo(avroSchema));
 ```
 
+### Emitting watermarks
+Emitting watermarks from the source itself could be beneficial for several 
purposes, like harnessing the
+[Flink Watermark 
Alignment](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment),
+or prevent triggering 
[windows](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/windows/)
+too early when reading multiple data files concurrently.
+
+Enable watermark generation for an `IcebergSource` by setting the 
`watermarkColumn`.
+The supported column types are `timestamp`, `timestamptz` and `long`.
+Iceberg `timestamp` or `timestamptz` inherently contains the time precision. 
So there is no need
+to specify the time unit. But `long` type column doesn't contain time unit 
information. Use  
+`watermarkTimeUnit` to configure the conversion for long columns.
+
+The watermarks are generated based on column metrics stored for data files and 
emitted once per split.
+If multiple smaller files with different time ranges are combined into a 
single split, it can increase
+the out-of-orderliness and extra data buffering in the Flink state. The main 
purpose of watermark alignment
+is to reduce out-of-orderliness and excess data buffering in the Flink state. 
Hence it is recommended to
+set `read.split.open-file-cost` to a very large value to prevent combining 
multiple smaller files into a
+single split. The negative impact (of not combining small files into a single 
split) is on read throughput,
+especially if there are many small files. In typical stateful processing jobs, 
source read throughput is not
+the bottleneck. Hence this is probably a reasonable tradeoff.
+
+This feature requires column-level min-max stats. Make sure stats are 
generated for the watermark column
+during write phase. By default, the column metrics are collected for the first 
100 columns of the table.
+If watermark column doesn't have stats enabled by default, use
+[write properties](configuration.md#write-properties) starting with 
`write.metadata.metrics` when needed.
+
+The following example could be useful if watermarks are used for windowing. 
The source reads Iceberg data files
+in order, using a timestamp column and emits watermarks:
+```java
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
+TableLoader tableLoader = 
TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
+
+DataStream<RowData> stream =
+    env.fromSource(
+        IcebergSource.forRowData()
+            .tableLoader(tableLoader)
+            // Watermark using timestamp column
+            .watermarkColumn("timestamp_column")
+            .build(),
+        // Watermarks are generated by the source, no need to generate it 
manually
+        WatermarkStrategy.<RowData>noWatermarks()
+            // Extract event timestamp from records
+            .withTimestampAssigner((record, eventTime) -> 
record.getTimestamp(pos, precision).getMillisecond()),
+        SOURCE_NAME,
+        TypeInformation.of(RowData.class));
+```
+
+Example for reading Iceberg table using a long event column for watermark 
alignment:
+```java
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
+TableLoader tableLoader = 
TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
+
+DataStream<RowData> stream =
+    env.fromSource(
+        IcebergSource source = IcebergSource.forRowData()
+            .tableLoader(tableLoader)
+            // Disable combining multiple files to a single split 
+            .set(FlinkReadOptions.SPLIT_FILE_OPEN_COST, 
String.valueOf(TableProperties.SPLIT_SIZE_DEFAULT))
+            // Watermark using long column
+            .watermarkColumn("long_column")
+            .watermarkTimeUnit(TimeUnit.MILLI_SCALE)
+            .build(),
+        // Watermarks are generated by the source, no need to generate it 
manually
+        WatermarkStrategy.<RowData>noWatermarks()
+            .withWatermarkAlignment(watermarkGroup, maxAllowedWatermarkDrift),
+        SOURCE_NAME,
+        TypeInformation.of(RowData.class));
+```
+
 ## Options
 
 ### Read options

Reply via email to