Fokko commented on code in PR #7099:
URL: https://github.com/apache/iceberg/pull/7099#discussion_r1135184392


##########
docs/flink-ddl.md:
##########
@@ -0,0 +1,234 @@
+---
+title: "Flink DDL"
+url: flink-ddl
+aliases:
+    - "flink/flink-ddl"
+menu:
+    main:
+        parent: Flink
+        weight: 200
+---
+<!--
+ - Licensed to the Apache Software Foundation (ASF) under one or more
+ - contributor license agreements.  See the NOTICE file distributed with
+ - this work for additional information regarding copyright ownership.
+ - The ASF licenses this file to You under the Apache License, Version 2.0
+ - (the "License"); you may not use this file except in compliance with
+ - the License.  You may obtain a copy of the License at
+ -
+ -   http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing, software
+ - distributed under the License is distributed on an "AS IS" BASIS,
+ - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ - See the License for the specific language governing permissions and
+ - limitations under the License.
+ -->
+
+## DDL commands

Review Comment:
   For the other docs, we tend to start with a single hash instead of two.



##########
docs/flink-actions.md:
##########
@@ -0,0 +1,42 @@
+---
+title: "Flink Actions"
+url: flink-actions
+aliases:
+    - "flink/flink-actions"
+menu:
+    main:
+        parent: Flink
+        weight: 500
+---
+<!--
+ - Licensed to the Apache Software Foundation (ASF) under one or more
+ - contributor license agreements.  See the NOTICE file distributed with
+ - this work for additional information regarding copyright ownership.
+ - The ASF licenses this file to You under the Apache License, Version 2.0
+ - (the "License"); you may not use this file except in compliance with
+ - the License.  You may obtain a copy of the License at
+ -
+ -   http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing, software
+ - distributed under the License is distributed on an "AS IS" BASIS,
+ - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ - See the License for the specific language governing permissions and
+ - limitations under the License.
+ -->
+
+## Rewrite files action.
+
+Iceberg provides API to rewrite small files into large files by submitting 
flink batch job. The behavior of this flink action is the same as the spark's 
[rewriteDataFiles](../maintenance/#compact-data-files).
+
+```java
+import org.apache.iceberg.flink.actions.Actions;
+
+TableLoader tableLoader = 
TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
+Table table = tableLoader.loadTable();
+RewriteDataFilesActionResult result = Actions.forTable(table)
+        .rewriteDataFiles()
+        .execute();
+```
+
+For more doc about options of the rewrite files action, please see 
[RewriteDataFilesAction](../../../javadoc/{{% icebergVersion 
%}}/org/apache/iceberg/flink/actions/RewriteDataFilesAction.html)

Review Comment:
   ```suggestion
   For more details of the rewrite files action, please refer to 
[RewriteDataFilesAction](../../../javadoc/{{% icebergVersion 
%}}/org/apache/iceberg/flink/actions/RewriteDataFilesAction.html)
   ```



##########
docs/flink-writes.md:
##########
@@ -0,0 +1,269 @@
+---
+title: "Flink Writes"
+url: flink-writes
+aliases:
+    - "flink/flink-writes"
+menu:
+    main:
+        parent: Flink
+        weight: 400
+---
+<!--
+ - Licensed to the Apache Software Foundation (ASF) under one or more
+ - contributor license agreements.  See the NOTICE file distributed with
+ - this work for additional information regarding copyright ownership.
+ - The ASF licenses this file to You under the Apache License, Version 2.0
+ - (the "License"); you may not use this file except in compliance with
+ - the License.  You may obtain a copy of the License at
+ -
+ -   http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing, software
+ - distributed under the License is distributed on an "AS IS" BASIS,
+ - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ - See the License for the specific language governing permissions and
+ - limitations under the License.
+ -->
+
+## Writing with SQL
+
+Iceberg support both `INSERT INTO` and `INSERT OVERWRITE`.
+
+### `INSERT INTO`
+
+To append new data to a table with a Flink streaming job, use `INSERT INTO`:
+
+```sql
+INSERT INTO `hive_catalog`.`default`.`sample` VALUES (1, 'a');
+INSERT INTO `hive_catalog`.`default`.`sample` SELECT id, data from 
other_kafka_table;
+```
+
+### `INSERT OVERWRITE`
+
+To replace data in the table with the result of a query, use `INSERT 
OVERWRITE` in batch job (flink streaming job does not support `INSERT 
OVERWRITE`). Overwrites are atomic operations for Iceberg tables.
+
+Partitions that have rows produced by the SELECT query will be replaced, for 
example:
+
+```sql
+INSERT OVERWRITE sample VALUES (1, 'a');
+```
+
+Iceberg also support overwriting given partitions by the `select` values:
+
+```sql
+INSERT OVERWRITE `hive_catalog`.`default`.`sample` PARTITION(data='a') SELECT 
6;
+```
+
+For a partitioned iceberg table, when all the partition columns are set a 
value in `PARTITION` clause, it is inserting into a static partition, otherwise 
if partial partition columns (prefix part of all partition columns) are set a 
value in `PARTITION` clause, it is writing the query result into a dynamic 
partition.
+For an unpartitioned iceberg table, its data will be completely overwritten by 
`INSERT OVERWRITE`.
+
+### `UPSERT`
+
+Iceberg supports `UPSERT` based on the primary key when writing data into v2 
table format. There are two ways to enable upsert.
+
+1. Enable the `UPSERT` mode as table-level property `write.upsert.enabled`. 
Here is an example SQL statement to set the table property when creating a 
table. It would be applied for all write paths to this table (batch or 
streaming) unless overwritten by write options as described later.
+
+```sql
+CREATE TABLE `hive_catalog`.`default`.`sample` (
+  `id`  INT UNIQUE COMMENT 'unique id',
+  `data` STRING NOT NULL,
+ PRIMARY KEY(`id`) NOT ENFORCED
+) with ('format-version'='2', 'write.upsert.enabled'='true');
+```
+
+2. Enabling `UPSERT` mode using `upsert-enabled` in the [write 
options](#Write-options) provides more flexibility than a table level config. 
Note that you still need to use v2 table format and specify the primary key 
when creating the table.
+
+```sql
+INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
+...
+```
+
+{{< hint info >}}
+OVERWRITE and UPSERT can't be set together. In UPSERT mode, if the table is 
partitioned, the partition fields should be included in equality fields.
+{{< /hint >}}
+
+
+
+## Writing with DataStream
+
+Iceberg support writing to iceberg table from different DataStream input.
+
+
+### Appending data.
+
+Flink supports writing `DataStream<RowData>` and `DataStream<Row>` to the sink 
iceberg table natively.
+
+```java
+StreamExecutionEnvironment env = ...;
+
+DataStream<RowData> input = ... ;
+Configuration hadoopConf = new Configuration();
+TableLoader tableLoader = 
TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
+
+FlinkSink.forRowData(input)
+    .tableLoader(tableLoader)
+    .append();
+
+env.execute("Test Iceberg DataStream");
+```
+
+The iceberg API also allows users to write generic `DataStream<T>` to iceberg 
table, more example could be found in this [unit 
test](https://github.com/apache/iceberg/blob/master/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java).
+
+### Overwrite data
+
+Set the `overwrite` flag in FlinkSink builder to overwrite the data in 
existing iceberg tables:
+
+```java
+StreamExecutionEnvironment env = ...;
+
+DataStream<RowData> input = ... ;
+Configuration hadoopConf = new Configuration();
+TableLoader tableLoader = 
TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
+
+FlinkSink.forRowData(input)
+    .tableLoader(tableLoader)
+    .overwrite(true)
+    .append();
+
+env.execute("Test Iceberg DataStream");
+```
+
+### Upsert data
+
+Set the `upsert` flag in FlinkSink builder to upsert the data in existing 
iceberg table. The table must use v2 table format and have a primary key.
+
+```java
+StreamExecutionEnvironment env = ...;
+
+DataStream<RowData> input = ... ;
+Configuration hadoopConf = new Configuration();
+TableLoader tableLoader = 
TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
+
+FlinkSink.forRowData(input)
+    .tableLoader(tableLoader)
+    .upsert(true)
+    .append();
+
+env.execute("Test Iceberg DataStream");
+```
+
+{{< hint info >}}
+OVERWRITE and UPSERT can't be set together. In UPSERT mode, if the table is 
partitioned, the partition fields should be included in equality fields.
+{{< /hint >}}
+
+### Write with Avro GenericRecord
+
+Flink Iceberg sink provides `AvroGenericRecordToRowDataMapper` that converts
+Avro `GenericRecord` to Flink `RowData`. You can use the mapper to write
+Avro GenericRecord DataStream to Iceberg.
+
+Please make sure `flink-avro` jar is included in the classpath.
+Also `iceberg-flink-runtime` shaded bundle jar can't be used
+because the runtime jar shades the avro package.
+Please use non-shaded `iceberg-flink` jar instead.
+
+```java
+DataStream<org.apache.avro.generic.GenericRecord> dataStream = ...;
+
+Schema icebergSchema = table.schema();
+
+
+// The Avro schema converted from Iceberg schema can't be used
+// due to precision difference between how Iceberg schema (micro)
+// and Flink AvroToRowDataConverters (milli) deal with time type.
+// Instead, use the Avro schema defined directly.
+// See AvroGenericRecordToRowDataMapper Javadoc for more details.
+org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(icebergSchema, 
table.name());
+
+GenericRecordAvroTypeInfo avroTypeInfo = new 
GenericRecordAvroTypeInfo(avroSchema);
+RowType rowType = FlinkSchemaUtil.convert(icebergSchema);
+
+FlinkSink.builderFor(
+    dataStream,
+    AvroGenericRecordToRowDataMapper.forAvroSchema(avroSchema),
+    FlinkCompatibilityUtil.toTypeInfo(rowType))
+  .table(table)
+  .tableLoader(tableLoader)
+  .append();
+```
+
+### Netrics

Review Comment:
   ```suggestion
   ### Metrics
   ```



##########
docs/flink-writes.md:
##########
@@ -0,0 +1,269 @@
+---
+title: "Flink Writes"
+url: flink-writes
+aliases:
+    - "flink/flink-writes"
+menu:
+    main:
+        parent: Flink
+        weight: 400
+---
+<!--
+ - Licensed to the Apache Software Foundation (ASF) under one or more
+ - contributor license agreements.  See the NOTICE file distributed with
+ - this work for additional information regarding copyright ownership.
+ - The ASF licenses this file to You under the Apache License, Version 2.0
+ - (the "License"); you may not use this file except in compliance with
+ - the License.  You may obtain a copy of the License at
+ -
+ -   http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing, software
+ - distributed under the License is distributed on an "AS IS" BASIS,
+ - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ - See the License for the specific language governing permissions and
+ - limitations under the License.
+ -->
+
+## Writing with SQL
+
+Iceberg support both `INSERT INTO` and `INSERT OVERWRITE`.
+
+### `INSERT INTO`
+
+To append new data to a table with a Flink streaming job, use `INSERT INTO`:
+
+```sql
+INSERT INTO `hive_catalog`.`default`.`sample` VALUES (1, 'a');
+INSERT INTO `hive_catalog`.`default`.`sample` SELECT id, data from 
other_kafka_table;
+```
+
+### `INSERT OVERWRITE`
+
+To replace data in the table with the result of a query, use `INSERT 
OVERWRITE` in batch job (flink streaming job does not support `INSERT 
OVERWRITE`). Overwrites are atomic operations for Iceberg tables.
+
+Partitions that have rows produced by the SELECT query will be replaced, for 
example:
+
+```sql
+INSERT OVERWRITE sample VALUES (1, 'a');
+```
+
+Iceberg also support overwriting given partitions by the `select` values:
+
+```sql
+INSERT OVERWRITE `hive_catalog`.`default`.`sample` PARTITION(data='a') SELECT 
6;
+```
+
+For a partitioned iceberg table, when all the partition columns are set a 
value in `PARTITION` clause, it is inserting into a static partition, otherwise 
if partial partition columns (prefix part of all partition columns) are set a 
value in `PARTITION` clause, it is writing the query result into a dynamic 
partition.
+For an unpartitioned iceberg table, its data will be completely overwritten by 
`INSERT OVERWRITE`.
+
+### `UPSERT`
+
+Iceberg supports `UPSERT` based on the primary key when writing data into v2 
table format. There are two ways to enable upsert.
+
+1. Enable the `UPSERT` mode as table-level property `write.upsert.enabled`. 
Here is an example SQL statement to set the table property when creating a 
table. It would be applied for all write paths to this table (batch or 
streaming) unless overwritten by write options as described later.
+
+```sql
+CREATE TABLE `hive_catalog`.`default`.`sample` (
+  `id`  INT UNIQUE COMMENT 'unique id',
+  `data` STRING NOT NULL,
+ PRIMARY KEY(`id`) NOT ENFORCED
+) with ('format-version'='2', 'write.upsert.enabled'='true');
+```
+
+2. Enabling `UPSERT` mode using `upsert-enabled` in the [write 
options](#Write-options) provides more flexibility than a table level config. 
Note that you still need to use v2 table format and specify the primary key 
when creating the table.
+
+```sql
+INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
+...
+```
+
+{{< hint info >}}
+OVERWRITE and UPSERT can't be set together. In UPSERT mode, if the table is 
partitioned, the partition fields should be included in equality fields.
+{{< /hint >}}
+
+
+
+## Writing with DataStream
+
+Iceberg support writing to iceberg table from different DataStream input.
+
+
+### Appending data.
+
+Flink supports writing `DataStream<RowData>` and `DataStream<Row>` to the sink 
iceberg table natively.
+
+```java
+StreamExecutionEnvironment env = ...;
+
+DataStream<RowData> input = ... ;
+Configuration hadoopConf = new Configuration();
+TableLoader tableLoader = 
TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
+
+FlinkSink.forRowData(input)
+    .tableLoader(tableLoader)
+    .append();
+
+env.execute("Test Iceberg DataStream");
+```
+
+The iceberg API also allows users to write generic `DataStream<T>` to iceberg 
table, more example could be found in this [unit 
test](https://github.com/apache/iceberg/blob/master/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java).
+
+### Overwrite data
+
+Set the `overwrite` flag in FlinkSink builder to overwrite the data in 
existing iceberg tables:
+
+```java
+StreamExecutionEnvironment env = ...;
+
+DataStream<RowData> input = ... ;
+Configuration hadoopConf = new Configuration();
+TableLoader tableLoader = 
TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
+
+FlinkSink.forRowData(input)
+    .tableLoader(tableLoader)
+    .overwrite(true)
+    .append();
+
+env.execute("Test Iceberg DataStream");
+```
+
+### Upsert data
+
+Set the `upsert` flag in FlinkSink builder to upsert the data in existing 
iceberg table. The table must use v2 table format and have a primary key.
+
+```java
+StreamExecutionEnvironment env = ...;
+
+DataStream<RowData> input = ... ;
+Configuration hadoopConf = new Configuration();
+TableLoader tableLoader = 
TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
+
+FlinkSink.forRowData(input)
+    .tableLoader(tableLoader)
+    .upsert(true)
+    .append();
+
+env.execute("Test Iceberg DataStream");
+```
+
+{{< hint info >}}
+OVERWRITE and UPSERT can't be set together. In UPSERT mode, if the table is 
partitioned, the partition fields should be included in equality fields.
+{{< /hint >}}
+
+### Write with Avro GenericRecord
+
+Flink Iceberg sink provides `AvroGenericRecordToRowDataMapper` that converts
+Avro `GenericRecord` to Flink `RowData`. You can use the mapper to write
+Avro GenericRecord DataStream to Iceberg.
+
+Please make sure `flink-avro` jar is included in the classpath.
+Also `iceberg-flink-runtime` shaded bundle jar can't be used
+because the runtime jar shades the avro package.
+Please use non-shaded `iceberg-flink` jar instead.
+
+```java
+DataStream<org.apache.avro.generic.GenericRecord> dataStream = ...;
+
+Schema icebergSchema = table.schema();
+
+
+// The Avro schema converted from Iceberg schema can't be used
+// due to precision difference between how Iceberg schema (micro)
+// and Flink AvroToRowDataConverters (milli) deal with time type.
+// Instead, use the Avro schema defined directly.
+// See AvroGenericRecordToRowDataMapper Javadoc for more details.
+org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(icebergSchema, 
table.name());
+
+GenericRecordAvroTypeInfo avroTypeInfo = new 
GenericRecordAvroTypeInfo(avroSchema);
+RowType rowType = FlinkSchemaUtil.convert(icebergSchema);
+
+FlinkSink.builderFor(
+    dataStream,
+    AvroGenericRecordToRowDataMapper.forAvroSchema(avroSchema),
+    FlinkCompatibilityUtil.toTypeInfo(rowType))
+  .table(table)
+  .tableLoader(tableLoader)
+  .append();
+```
+
+### Netrics
+
+The following Flink metrics are provided by the Flink Iceberg sink.
+
+Parallel writer metrics are added under the sub group of `IcebergStreamWriter`.
+They should have the following key-value tags.
+
+* table: full table name (like iceberg.my_db.my_table)
+* subtask_index: writer subtask index starting from 0
+
+ Metric name                | Metric type | Description                        
                                                                 |
+| ------------------------- 
|------------|-----------------------------------------------------------------------------------------------------|
+| lastFlushDurationMs       | Gague      | The duration (in milli) that writer 
subtasks take to flush and upload the files during checkpoint.  |
+| flushedDataFiles          | Counter    | Number of data files flushed and 
uploaded.                                                          |
+| flushedDeleteFiles        | Counter    | Number of delete files flushed and 
uploaded.                                                        |
+| flushedReferencedDataFiles| Counter    | Number of data files referenced by 
the flushed delete files.                                        |
+| dataFilesSizeHistogram    | Histogram  | Histogram distribution of data file 
sizes (in bytes).                                               |
+| deleteFilesSizeHistogram  | Histogram  | Histogram distribution of delete 
file sizes (in bytes).                                             |
+
+Committer metrics are added under the sub group of `IcebergFilesCommitter`.
+They should have the following key-value tags.
+
+* table: full table name (like iceberg.my_db.my_table)
+
+ Metric name                      | Metric type | Description                  
                                              |
+|---------------------------------|--------|----------------------------------------------------------------------------|
+| lastCheckpointDurationMs        | Gague  | The duration (in milli) that the 
committer operator checkpoints its state. |
+| lastCommitDurationMs            | Gague  | The duration (in milli) that the 
Iceberg table commit takes.               |
+| committedDataFilesCount         | Counter | Number of data files committed.  
                                          |
+| committedDataFilesRecordCount   | Counter | Number of records contained in 
the committed data files.                   |
+| committedDataFilesByteCount     | Counter | Number of bytes contained in the 
committed data files.                     |
+| committedDeleteFilesCount       | Counter | Number of delete files 
committed.                                          |
+| committedDeleteFilesRecordCount | Counter | Number of records contained in 
the committed delete files.                 |
+| committedDeleteFilesByteCount   | Counter | Number of bytes contained in the 
committed delete files.                   |
+| elapsedSecondsSinceLastSuccessfulCommit| Gague  | Elapsed time (in seconds) 
since last successful Iceberg commit.            |
+
+`elapsedSecondsSinceLastSuccessfulCommit` is an ideal alerting metric
+to detect failed or missing Iceberg commits.
+
+* Iceberg commit happened after successful Flink checkpoint in the 
`notifyCheckpointComplete` callback.
+  It could happen that Iceberg commits failed (for whatever reason), while 
Flink checkpoints succeeding.
+* It could also happen that `notifyCheckpointComplete` wasn't triggered (for 
whatever bug).
+  As a result, there won't be any Iceberg commits attempted.
+
+If the checkpoint interval (and expected Iceberg commit interval) is 5 
minutes, set up alert with rule like `elapsedSecondsSinceLastSuccessfulCommit > 
60 minutes` to detect failed or missing Iceberg commits in the past hour.
+
+
+
+## Options
+
+### Write options
+
+Flink write options are passed when configuring the FlinkSink, like this:
+
+```

Review Comment:
   ```suggestion
   ```java
   ```



##########
docs/flink-queries.md:
##########
@@ -0,0 +1,468 @@
+---
+title: "Flink Queries"
+url: flink-queries
+aliases:
+   - "flink/flink-queries"
+menu:
+   main:
+      parent: Flink
+      weight: 300
+---
+<!--
+ - Licensed to the Apache Software Foundation (ASF) under one or more
+ - contributor license agreements.  See the NOTICE file distributed with
+ - this work for additional information regarding copyright ownership.
+ - The ASF licenses this file to You under the Apache License, Version 2.0
+ - (the "License"); you may not use this file except in compliance with
+ - the License.  You may obtain a copy of the License at
+ -
+ -   http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing, software
+ - distributed under the License is distributed on an "AS IS" BASIS,
+ - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ - See the License for the specific language governing permissions and
+ - limitations under the License.
+ -->
+
+## Reading with SQL
+
+Iceberg support both streaming and batch read in Flink. Execute the following 
sql command to switch execution mode from `streaming` to `batch`, and vice 
versa:
+
+```sql
+-- Execute the flink job in streaming mode for current session context
+SET execution.runtime-mode = streaming;
+
+-- Execute the flink job in batch mode for current session context
+SET execution.runtime-mode = batch;
+```
+
+### Flink batch read
+
+Submit a Flink __batch__ job using the following sentences:
+
+```sql
+-- Execute the flink job in batch mode for current session context
+SET execution.runtime-mode = batch;
+SELECT * FROM sample;
+```
+
+### Flink streaming read
+
+Iceberg supports processing incremental data in flink streaming jobs which 
starts from a historical snapshot-id:

Review Comment:
   ```suggestion
   Iceberg supports processing incremental data in Flink streaming jobs which 
starts from a historical snapshot-id:
   ```



##########
docs/flink-ddl.md:
##########
@@ -0,0 +1,234 @@
+---
+title: "Flink DDL"
+url: flink-ddl
+aliases:
+    - "flink/flink-ddl"
+menu:
+    main:
+        parent: Flink
+        weight: 200
+---
+<!--
+ - Licensed to the Apache Software Foundation (ASF) under one or more
+ - contributor license agreements.  See the NOTICE file distributed with
+ - this work for additional information regarding copyright ownership.
+ - The ASF licenses this file to You under the Apache License, Version 2.0
+ - (the "License"); you may not use this file except in compliance with
+ - the License.  You may obtain a copy of the License at
+ -
+ -   http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing, software
+ - distributed under the License is distributed on an "AS IS" BASIS,
+ - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ - See the License for the specific language governing permissions and
+ - limitations under the License.
+ -->
+
+## DDL commands
+
+###  `Create Catalog`
+
+#### Catalog Configuration

Review Comment:
   Should we split this one out into `flink-configuration.md`? If you're 
looking for DDL, you'll first encounter a long list of catalog configuration 
options.



##########
docs/flink-actions.md:
##########
@@ -0,0 +1,42 @@
+---
+title: "Flink Actions"
+url: flink-actions
+aliases:
+    - "flink/flink-actions"
+menu:
+    main:
+        parent: Flink
+        weight: 500
+---
+<!--
+ - Licensed to the Apache Software Foundation (ASF) under one or more
+ - contributor license agreements.  See the NOTICE file distributed with
+ - this work for additional information regarding copyright ownership.
+ - The ASF licenses this file to You under the Apache License, Version 2.0
+ - (the "License"); you may not use this file except in compliance with
+ - the License.  You may obtain a copy of the License at
+ -
+ -   http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing, software
+ - distributed under the License is distributed on an "AS IS" BASIS,
+ - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ - See the License for the specific language governing permissions and
+ - limitations under the License.
+ -->
+
+## Rewrite files action.
+
+Iceberg provides API to rewrite small files into large files by submitting 
flink batch job. The behavior of this flink action is the same as the spark's 
[rewriteDataFiles](../maintenance/#compact-data-files).

Review Comment:
   ```suggestion
   Iceberg provides API to rewrite small files into large files by submitting 
Flink batch jobs. The behavior of this Flink action is the same as Spark's 
[rewriteDataFiles](../maintenance/#compact-data-files).
   ```



##########
docs/flink-actions.md:
##########
@@ -0,0 +1,42 @@
+---
+title: "Flink Actions"
+url: flink-actions
+aliases:
+    - "flink/flink-actions"
+menu:
+    main:
+        parent: Flink
+        weight: 500
+---
+<!--
+ - Licensed to the Apache Software Foundation (ASF) under one or more
+ - contributor license agreements.  See the NOTICE file distributed with
+ - this work for additional information regarding copyright ownership.
+ - The ASF licenses this file to You under the Apache License, Version 2.0
+ - (the "License"); you may not use this file except in compliance with
+ - the License.  You may obtain a copy of the License at
+ -
+ -   http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing, software
+ - distributed under the License is distributed on an "AS IS" BASIS,
+ - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ - See the License for the specific language governing permissions and
+ - limitations under the License.
+ -->
+
+## Rewrite files action.
+
+Iceberg provides API to rewrite small files into large files by submitting 
flink batch job. The behavior of this flink action is the same as the spark's 
[rewriteDataFiles](../maintenance/#compact-data-files).

Review Comment:
   With https://github.com/apache/iceberg/pull/7070, I capitalized Spark and 
Flink. Since it is not a verb (yet). But I'm a Dutchie and not a native English 
speaker.



##########
docs/flink-writes.md:
##########
@@ -0,0 +1,269 @@
+---
+title: "Flink Writes"
+url: flink-writes
+aliases:
+    - "flink/flink-writes"
+menu:
+    main:
+        parent: Flink
+        weight: 400
+---
+<!--
+ - Licensed to the Apache Software Foundation (ASF) under one or more
+ - contributor license agreements.  See the NOTICE file distributed with
+ - this work for additional information regarding copyright ownership.
+ - The ASF licenses this file to You under the Apache License, Version 2.0
+ - (the "License"); you may not use this file except in compliance with
+ - the License.  You may obtain a copy of the License at
+ -
+ -   http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing, software
+ - distributed under the License is distributed on an "AS IS" BASIS,
+ - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ - See the License for the specific language governing permissions and
+ - limitations under the License.
+ -->
+
+## Writing with SQL
+
+Iceberg support both `INSERT INTO` and `INSERT OVERWRITE`.
+
+### `INSERT INTO`
+
+To append new data to a table with a Flink streaming job, use `INSERT INTO`:
+
+```sql
+INSERT INTO `hive_catalog`.`default`.`sample` VALUES (1, 'a');
+INSERT INTO `hive_catalog`.`default`.`sample` SELECT id, data from 
other_kafka_table;
+```
+
+### `INSERT OVERWRITE`
+
+To replace data in the table with the result of a query, use `INSERT 
OVERWRITE` in batch job (flink streaming job does not support `INSERT 
OVERWRITE`). Overwrites are atomic operations for Iceberg tables.
+
+Partitions that have rows produced by the SELECT query will be replaced, for 
example:
+
+```sql
+INSERT OVERWRITE sample VALUES (1, 'a');
+```
+
+Iceberg also support overwriting given partitions by the `select` values:
+
+```sql
+INSERT OVERWRITE `hive_catalog`.`default`.`sample` PARTITION(data='a') SELECT 
6;
+```
+
+For a partitioned iceberg table, when all the partition columns are set a 
value in `PARTITION` clause, it is inserting into a static partition, otherwise 
if partial partition columns (prefix part of all partition columns) are set a 
value in `PARTITION` clause, it is writing the query result into a dynamic 
partition.
+For an unpartitioned iceberg table, its data will be completely overwritten by 
`INSERT OVERWRITE`.
+
+### `UPSERT`
+
+Iceberg supports `UPSERT` based on the primary key when writing data into v2 
table format. There are two ways to enable upsert.
+
+1. Enable the `UPSERT` mode as table-level property `write.upsert.enabled`. 
Here is an example SQL statement to set the table property when creating a 
table. It would be applied for all write paths to this table (batch or 
streaming) unless overwritten by write options as described later.
+
+```sql
+CREATE TABLE `hive_catalog`.`default`.`sample` (
+  `id`  INT UNIQUE COMMENT 'unique id',
+  `data` STRING NOT NULL,
+ PRIMARY KEY(`id`) NOT ENFORCED
+) with ('format-version'='2', 'write.upsert.enabled'='true');
+```
+
+2. Enabling `UPSERT` mode using `upsert-enabled` in the [write 
options](#Write-options) provides more flexibility than a table level config. 
Note that you still need to use v2 table format and specify the primary key 
when creating the table.
+
+```sql
+INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
+...
+```
+
+{{< hint info >}}
+OVERWRITE and UPSERT can't be set together. In UPSERT mode, if the table is 
partitioned, the partition fields should be included in equality fields.
+{{< /hint >}}
+
+
+
+## Writing with DataStream
+
+Iceberg support writing to iceberg table from different DataStream input.
+
+
+### Appending data.
+
+Flink supports writing `DataStream<RowData>` and `DataStream<Row>` to the sink 
iceberg table natively.
+
+```java
+StreamExecutionEnvironment env = ...;
+
+DataStream<RowData> input = ... ;
+Configuration hadoopConf = new Configuration();
+TableLoader tableLoader = 
TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
+
+FlinkSink.forRowData(input)
+    .tableLoader(tableLoader)
+    .append();
+
+env.execute("Test Iceberg DataStream");
+```
+
+The iceberg API also allows users to write generic `DataStream<T>` to iceberg 
table, more example could be found in this [unit 
test](https://github.com/apache/iceberg/blob/master/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java).
+
+### Overwrite data
+
+Set the `overwrite` flag in FlinkSink builder to overwrite the data in 
existing iceberg tables:
+
+```java
+StreamExecutionEnvironment env = ...;
+
+DataStream<RowData> input = ... ;
+Configuration hadoopConf = new Configuration();
+TableLoader tableLoader = 
TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
+
+FlinkSink.forRowData(input)
+    .tableLoader(tableLoader)
+    .overwrite(true)
+    .append();
+
+env.execute("Test Iceberg DataStream");
+```
+
+### Upsert data
+
+Set the `upsert` flag in FlinkSink builder to upsert the data in existing 
iceberg table. The table must use v2 table format and have a primary key.
+
+```java
+StreamExecutionEnvironment env = ...;
+
+DataStream<RowData> input = ... ;
+Configuration hadoopConf = new Configuration();
+TableLoader tableLoader = 
TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
+
+FlinkSink.forRowData(input)
+    .tableLoader(tableLoader)
+    .upsert(true)
+    .append();
+
+env.execute("Test Iceberg DataStream");
+```
+
+{{< hint info >}}
+OVERWRITE and UPSERT can't be set together. In UPSERT mode, if the table is 
partitioned, the partition fields should be included in equality fields.
+{{< /hint >}}
+
+### Write with Avro GenericRecord
+
+Flink Iceberg sink provides `AvroGenericRecordToRowDataMapper` that converts
+Avro `GenericRecord` to Flink `RowData`. You can use the mapper to write
+Avro GenericRecord DataStream to Iceberg.
+
+Please make sure `flink-avro` jar is included in the classpath.
+Also `iceberg-flink-runtime` shaded bundle jar can't be used
+because the runtime jar shades the avro package.
+Please use non-shaded `iceberg-flink` jar instead.
+
+```java
+DataStream<org.apache.avro.generic.GenericRecord> dataStream = ...;
+
+Schema icebergSchema = table.schema();
+
+
+// The Avro schema converted from Iceberg schema can't be used
+// due to precision difference between how Iceberg schema (micro)
+// and Flink AvroToRowDataConverters (milli) deal with time type.
+// Instead, use the Avro schema defined directly.
+// See AvroGenericRecordToRowDataMapper Javadoc for more details.
+org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(icebergSchema, 
table.name());
+
+GenericRecordAvroTypeInfo avroTypeInfo = new 
GenericRecordAvroTypeInfo(avroSchema);
+RowType rowType = FlinkSchemaUtil.convert(icebergSchema);
+
+FlinkSink.builderFor(
+    dataStream,
+    AvroGenericRecordToRowDataMapper.forAvroSchema(avroSchema),
+    FlinkCompatibilityUtil.toTypeInfo(rowType))
+  .table(table)
+  .tableLoader(tableLoader)
+  .append();
+```
+
+### Netrics
+
+The following Flink metrics are provided by the Flink Iceberg sink.
+
+Parallel writer metrics are added under the sub group of `IcebergStreamWriter`.
+They should have the following key-value tags.
+
+* table: full table name (like iceberg.my_db.my_table)
+* subtask_index: writer subtask index starting from 0
+
+ Metric name                | Metric type | Description                        
                                                                 |
+| ------------------------- 
|------------|-----------------------------------------------------------------------------------------------------|
+| lastFlushDurationMs       | Gague      | The duration (in milli) that writer 
subtasks take to flush and upload the files during checkpoint.  |
+| flushedDataFiles          | Counter    | Number of data files flushed and 
uploaded.                                                          |
+| flushedDeleteFiles        | Counter    | Number of delete files flushed and 
uploaded.                                                        |
+| flushedReferencedDataFiles| Counter    | Number of data files referenced by 
the flushed delete files.                                        |
+| dataFilesSizeHistogram    | Histogram  | Histogram distribution of data file 
sizes (in bytes).                                               |
+| deleteFilesSizeHistogram  | Histogram  | Histogram distribution of delete 
file sizes (in bytes).                                             |
+
+Committer metrics are added under the sub group of `IcebergFilesCommitter`.
+They should have the following key-value tags.
+
+* table: full table name (like iceberg.my_db.my_table)
+
+ Metric name                      | Metric type | Description                  
                                              |
+|---------------------------------|--------|----------------------------------------------------------------------------|
+| lastCheckpointDurationMs        | Gague  | The duration (in milli) that the 
committer operator checkpoints its state. |
+| lastCommitDurationMs            | Gague  | The duration (in milli) that the 
Iceberg table commit takes.               |
+| committedDataFilesCount         | Counter | Number of data files committed.  
                                          |
+| committedDataFilesRecordCount   | Counter | Number of records contained in 
the committed data files.                   |
+| committedDataFilesByteCount     | Counter | Number of bytes contained in the 
committed data files.                     |
+| committedDeleteFilesCount       | Counter | Number of delete files 
committed.                                          |
+| committedDeleteFilesRecordCount | Counter | Number of records contained in 
the committed delete files.                 |
+| committedDeleteFilesByteCount   | Counter | Number of bytes contained in the 
committed delete files.                   |
+| elapsedSecondsSinceLastSuccessfulCommit| Gague  | Elapsed time (in seconds) 
since last successful Iceberg commit.            |
+
+`elapsedSecondsSinceLastSuccessfulCommit` is an ideal alerting metric
+to detect failed or missing Iceberg commits.
+
+* Iceberg commit happened after successful Flink checkpoint in the 
`notifyCheckpointComplete` callback.
+  It could happen that Iceberg commits failed (for whatever reason), while 
Flink checkpoints succeeding.
+* It could also happen that `notifyCheckpointComplete` wasn't triggered (for 
whatever bug).
+  As a result, there won't be any Iceberg commits attempted.
+
+If the checkpoint interval (and expected Iceberg commit interval) is 5 
minutes, set up alert with rule like `elapsedSecondsSinceLastSuccessfulCommit > 
60 minutes` to detect failed or missing Iceberg commits in the past hour.
+
+
+
+## Options
+
+### Write options
+
+Flink write options are passed when configuring the FlinkSink, like this:
+
+```
+FlinkSink.Builder builder = FlinkSink.forRow(dataStream, 
SimpleDataUtil.FLINK_SCHEMA)
+    .table(table)
+    .tableLoader(tableLoader)
+    .set("write-format", "orc")
+    .set(FlinkWriteOptions.OVERWRITE_MODE, "true");
+```
+
+For Flink SQL, write options can be passed in via SQL hints like this:
+
+```

Review Comment:
   ```suggestion
   ```sql
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to