This is an automated email from the ASF dual-hosted git repository.
stevenwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 8519224de3 Flink: Document watermark generation feature (#9179)
8519224de3 is described below
commit 8519224de33b26cfd7c539ffe6f123ea66165711
Author: pvary <[email protected]>
AuthorDate: Tue Dec 5 17:48:10 2023 +0100
Flink: Document watermark generation feature (#9179)
---
docs/flink-queries.md | 69 +++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 69 insertions(+)
diff --git a/docs/flink-queries.md b/docs/flink-queries.md
index 4cef5468cd..cf68fa367c 100644
--- a/docs/flink-queries.md
+++ b/docs/flink-queries.md
@@ -277,6 +277,75 @@ DataStream<Row> stream = env.fromSource(source,
WatermarkStrategy.noWatermarks()
"Iceberg Source as Avro GenericRecord", new
GenericRecordAvroTypeInfo(avroSchema));
```
+### Emitting watermarks
+Emitting watermarks from the source itself could be beneficial for several
purposes, like harnessing the
+[Flink Watermark
Alignment](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment),
+or prevent triggering
[windows](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/windows/)
+too early when reading multiple data files concurrently.
+
+Enable watermark generation for an `IcebergSource` by setting the
`watermarkColumn`.
+The supported column types are `timestamp`, `timestamptz` and `long`.
+Iceberg `timestamp` or `timestamptz` inherently contains the time precision.
So there is no need
+to specify the time unit. But `long` type column doesn't contain time unit
information. Use
+`watermarkTimeUnit` to configure the conversion for long columns.
+
+The watermarks are generated based on column metrics stored for data files and
emitted once per split.
+If multiple smaller files with different time ranges are combined into a
single split, it can increase
+the out-of-orderliness and extra data buffering in the Flink state. The main
purpose of watermark alignment
+is to reduce out-of-orderliness and excess data buffering in the Flink state.
Hence it is recommended to
+set `read.split.open-file-cost` to a very large value to prevent combining
multiple smaller files into a
+single split. The negative impact (of not combining small files into a single
split) is on read throughput,
+especially if there are many small files. In typical stateful processing jobs,
source read throughput is not
+the bottleneck. Hence this is probably a reasonable tradeoff.
+
+This feature requires column-level min-max stats. Make sure stats are
generated for the watermark column
+during write phase. By default, the column metrics are collected for the first
100 columns of the table.
+If watermark column doesn't have stats enabled by default, use
+[write properties](configuration.md#write-properties) starting with
`write.metadata.metrics` when needed.
+
+The following example could be useful if watermarks are used for windowing.
The source reads Iceberg data files
+in order, using a timestamp column and emits watermarks:
+```java
+StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment();
+TableLoader tableLoader =
TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
+
+DataStream<RowData> stream =
+ env.fromSource(
+ IcebergSource.forRowData()
+ .tableLoader(tableLoader)
+ // Watermark using timestamp column
+ .watermarkColumn("timestamp_column")
+ .build(),
+ // Watermarks are generated by the source, no need to generate it
manually
+ WatermarkStrategy.<RowData>noWatermarks()
+ // Extract event timestamp from records
+ .withTimestampAssigner((record, eventTime) ->
record.getTimestamp(pos, precision).getMillisecond()),
+ SOURCE_NAME,
+ TypeInformation.of(RowData.class));
+```
+
+Example for reading Iceberg table using a long event column for watermark
alignment:
+```java
+StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment();
+TableLoader tableLoader =
TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
+
+DataStream<RowData> stream =
+ env.fromSource(
+ IcebergSource source = IcebergSource.forRowData()
+ .tableLoader(tableLoader)
+ // Disable combining multiple files to a single split
+ .set(FlinkReadOptions.SPLIT_FILE_OPEN_COST,
String.valueOf(TableProperties.SPLIT_SIZE_DEFAULT))
+ // Watermark using long column
+ .watermarkColumn("long_column")
+ .watermarkTimeUnit(TimeUnit.MILLI_SCALE)
+ .build(),
+ // Watermarks are generated by the source, no need to generate it
manually
+ WatermarkStrategy.<RowData>noWatermarks()
+ .withWatermarkAlignment(watermarkGroup, maxAllowedWatermarkDrift),
+ SOURCE_NAME,
+ TypeInformation.of(RowData.class));
+```
+
## Options
### Read options