This is an automated email from the ASF dual-hosted git repository.

stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 179707346a Flink: Update the docs (#7070)
179707346a is described below

commit 179707346a22c53e2d4811a061651f65239d0d66
Author: Fokko Driesprong <[email protected]>
AuthorDate: Sun Mar 12 01:36:33 2023 +0100

    Flink: Update the docs (#7070)
---
 docs/flink-getting-started.md | 239 +++++++++++++++++++-----------------------
 1 file changed, 108 insertions(+), 131 deletions(-)

diff --git a/docs/flink-getting-started.md b/docs/flink-getting-started.md
index c95230b5df..b60e29965f 100644
--- a/docs/flink-getting-started.md
+++ b/docs/flink-getting-started.md
@@ -46,128 +46,83 @@ Apache Iceberg supports both [Apache 
Flink](https://flink.apache.org/)'s DataStr
 
 ## Preparation when using Flink SQL Client
 
-To create iceberg table in flink, we recommend to use [Flink SQL 
Client](https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sqlClient.html)
 because it's easier for users to understand the concepts.
+To create Iceberg table in Flink, it is recommended to use [Flink SQL 
Client](https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sqlClient.html)
 as it's easier for users to understand the concepts.
 
-Step.1 Downloading the flink 1.11.x binary package from the apache flink 
[download page](https://flink.apache.org/downloads.html). We now use scala 2.12 
to archive the apache iceberg-flink-runtime jar, so it's recommended to use 
flink 1.11 bundled with scala 2.12.
+Download Flink from the [Apache download 
page](https://flink.apache.org/downloads.html). Iceberg uses Scala 2.12 when 
compiling the Apache `iceberg-flink-runtime` jar, so it's recommended to use 
Flink 1.16 bundled with Scala 2.12.
 
 ```bash
-FLINK_VERSION=1.11.1
+FLINK_VERSION=1.16.1
 SCALA_VERSION=2.12
-APACHE_FLINK_URL=archive.apache.org/dist/flink/
+APACHE_FLINK_URL=https://archive.apache.org/dist/flink/
 wget 
${APACHE_FLINK_URL}/flink-${FLINK_VERSION}/flink-${FLINK_VERSION}-bin-scala_${SCALA_VERSION}.tgz
 tar xzvf flink-${FLINK_VERSION}-bin-scala_${SCALA_VERSION}.tgz
 ```
 
-Step.2 Start a standalone flink cluster within hadoop environment.
+Start a standalone Flink cluster within Hadoop environment:
 
 ```bash
 # HADOOP_HOME is your hadoop root directory after unpack the binary package.
+APACHE_HADOOP_URL=https://archive.apache.org/dist/hadoop/
+HADOOP_VERSION=2.8.5
+wget 
${APACHE_HADOOP_URL}/common/hadoop-${HADOOP_VERSION}/hadoop-${HADOOP_VERSION}.tar.gz
+tar xzvf hadoop-${HADOOP_VERSION}.tar.gz
+HADOOP_HOME=`pwd`/hadoop-${HADOOP_VERSION}
+
 export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
 
 # Start the flink standalone cluster
 ./bin/start-cluster.sh
 ```
 
-Step.3 Start the flink SQL client.
-
-We've created a separate `flink-runtime` module in iceberg project to generate 
a bundled jar, which could be loaded by flink SQL client directly.
-
-If we want to build the `flink-runtime` bundled jar manually, please just 
build the `iceberg` project and it will generate the jar under 
`<iceberg-root-dir>/flink-runtime/build/libs`. Of course, we could also 
download the `flink-runtime` jar from the [apache official 
repository](https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime/).
+Start the Flink SQL client. There is a separate `flink-runtime` module in the 
Iceberg project to generate a bundled jar, which could be loaded by Flink SQL 
client directly. To build the `flink-runtime` bundled jar manually, build the 
`iceberg` project, and it will generate the jar under 
`<iceberg-root-dir>/flink-runtime/build/libs`. Or download the `flink-runtime` 
jar from the [Apache 
repository](https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.16/{{%
 icebe [...]
 
 ```bash
 # HADOOP_HOME is your hadoop root directory after unpack the binary package.
-export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
+export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`   
 
-./bin/sql-client.sh embedded -j 
<flink-runtime-directory>/iceberg-flink-runtime-xxx.jar shell
+./bin/sql-client.sh embedded -j 
<flink-runtime-directory>/iceberg-flink-runtime-1.16-{{% icebergVersion %}}.jar 
shell
 ```
 
-By default, iceberg has included hadoop jars for hadoop catalog. If we want to 
use hive catalog, we will need to load the hive jars when opening the flink sql 
client. Fortunately, apache flink has provided a [bundled hive 
jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-2.3.6_2.11/1.11.0/flink-sql-connector-hive-2.3.6_2.11-1.11.0.jar)
 for sql client. So we could open the sql client
-as the following:
+By default, Iceberg ships with Hadoop jars for Hadoop catalog. To use Hive 
catalog, load the Hive jars when opening the Flink SQL client. Fortunately, 
Flink has provided a [bundled hive 
jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-2.3.9_2.12/1.16.1/flink-sql-connector-hive-2.3.9_2.12-1.16.1.jar)
 for the SQL client. An example on how to download the dependencies and get 
started:
 
 ```bash
 # HADOOP_HOME is your hadoop root directory after unpack the binary package.
 export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
 
-# download Iceberg dependency
-ICEBERG_VERSION=0.11.1
+ICEBERG_VERSION={{% icebergVersion %}}
 MAVEN_URL=https://repo1.maven.org/maven2
 ICEBERG_MAVEN_URL=${MAVEN_URL}/org/apache/iceberg
 ICEBERG_PACKAGE=iceberg-flink-runtime
-wget 
${ICEBERG_MAVEN_URL}/${ICEBERG_PACKAGE}/${ICEBERG_VERSION}/${ICEBERG_PACKAGE}-${ICEBERG_VERSION}.jar
+wget 
${ICEBERG_MAVEN_URL}/${ICEBERG_PACKAGE}-${FLINK_VERSION_MAJOR}/${ICEBERG_VERSION}/${ICEBERG_PACKAGE}-${FLINK_VERSION_MAJOR}-${ICEBERG_VERSION}.jar
 -P lib/
 
-# download the 
flink-sql-connector-hive-${HIVE_VERSION}_${SCALA_VERSION}-${FLINK_VERSION}.jar
-HIVE_VERSION=2.3.6
-SCALA_VERSION=2.11
-FLINK_VERSION=1.11.0
+HIVE_VERSION=2.3.9
+SCALA_VERSION=2.12
+FLINK_VERSION=1.16.1
 FLINK_CONNECTOR_URL=${MAVEN_URL}/org/apache/flink
 FLINK_CONNECTOR_PACKAGE=flink-sql-connector-hive
 wget 
${FLINK_CONNECTOR_URL}/${FLINK_CONNECTOR_PACKAGE}-${HIVE_VERSION}_${SCALA_VERSION}/${FLINK_VERSION}/${FLINK_CONNECTOR_PACKAGE}-${HIVE_VERSION}_${SCALA_VERSION}-${FLINK_VERSION}.jar
 
-# open the SQL client.
-/path/to/bin/sql-client.sh embedded \
-    -j ${ICEBERG_PACKAGE}-${ICEBERG_VERSION}.jar \
-    -j 
${FLINK_CONNECTOR_PACKAGE}-${HIVE_VERSION}_${SCALA_VERSION}-${FLINK_VERSION}.jar
 \
-    shell
+./bin/sql-client.sh embedded shell
 ```
-## Preparation when using Flink's Python API
 
-Install the Apache Flink dependency using `pip`
-```python
-pip install apache-flink==1.11.1
-```
-
-In order for `pyflink` to function properly, it needs to have access to all 
Hadoop jars. For `pyflink`
-we need to copy those Hadoop jars to the installation directory of `pyflink`, 
which can be found under
-`<PYTHON_ENV_INSTALL_DIR>/site-packages/pyflink/lib/` (see also a mention of 
this on
-the [Flink 
ML](http://mail-archives.apache.org/mod_mbox/flink-user/202105.mbox/%3C3D98BDD2-89B1-42F5-B6F4-6C06A038F978%40gmail.com%3E)).
-We can use the following short Python script to copy all Hadoop jars (you need 
to make sure that `HADOOP_HOME`
-points to your Hadoop installation):
-
-```python
-import os
-import shutil
-import site
+## Flink's Python API
 
+{{< hint info >}}
+PyFlink 1.6.1 [does not work on OSX with a M1 
cpu](https://issues.apache.org/jira/browse/FLINK-28786) 
+{{< /hint >}}
 
-def copy_all_hadoop_jars_to_pyflink():
-    if not os.getenv("HADOOP_HOME"):
-        raise Exception("The HADOOP_HOME env var must be set and point to a 
valid Hadoop installation")
-
-    jar_files = []
-
-    def find_pyflink_lib_dir():
-        for dir in site.getsitepackages():
-            package_dir = os.path.join(dir, "pyflink", "lib")
-            if os.path.exists(package_dir):
-                return package_dir
-        return None
-
-    for root, _, files in os.walk(os.getenv("HADOOP_HOME")):
-        for file in files:
-            if file.endswith(".jar"):
-                jar_files.append(os.path.join(root, file))
-
-    pyflink_lib_dir = find_pyflink_lib_dir()
-
-    num_jar_files = len(jar_files)
-    print(f"Copying {num_jar_files} Hadoop jar files to pyflink's lib 
directory at {pyflink_lib_dir}")
-    for jar in jar_files:
-        shutil.copy(jar, pyflink_lib_dir)
-
+Install the Apache Flink dependency using `pip`:
 
-if __name__ == '__main__':
-    copy_all_hadoop_jars_to_pyflink()
+```python
+pip install apache-flink==1.16.1
 ```
 
-Once the script finished, you should see output similar to
-```
-Copying 645 Hadoop jar files to pyflink's lib directory at 
<PYTHON_DIR>/lib/python3.8/site-packages/pyflink/lib
-```
+Provide a `file://` path to the `iceberg-flink-runtime` jar, which can be 
obtained by building the project and looking at 
`<iceberg-root-dir>/flink-runtime/build/libs`, or downloading it from the 
[Apache official 
repository](https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime/).
 Third-party jars can be added to `pyflink` via:
+
+- `env.add_jars("file:///my/jar/path/connector.jar")`
+- `table_env.get_config().get_configuration().set_string("pipeline.jars", 
"file:///my/jar/path/connector.jar")`
 
-Now we need to provide a `file://` path to the `iceberg-flink-runtime` jar, 
which we can either get by building the project
-and looking at `<iceberg-root-dir>/flink-runtime/build/libs`, or downloading 
it from the [Apache official 
repository](https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime/).
-Third-party libs can be added to `pyflink` via 
`env.add_jars("file:///my/jar/path/connector.jar")` / 
`table_env.get_config().get_configuration().set_string("pipeline.jars", 
"file:///my/jar/path/connector.jar")`, which is also mentioned in the official 
[docs](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/dependency_management/).
-In our example we're using `env.add_jars(..)` as shown below:
+This is also mentioned in the official 
[docs](https://ci.apache.org/projects/flink/flink-docs-release-1.16/docs/dev/python/dependency_management/).
 The example below uses `env.add_jars(..)`:
 
 ```python
 import os
@@ -175,27 +130,52 @@ import os
 from pyflink.datastream import StreamExecutionEnvironment
 
 env = StreamExecutionEnvironment.get_execution_environment()
-iceberg_flink_runtime_jar = os.path.join(os.getcwd(), 
"iceberg-flink-runtime-{{% icebergVersion %}}.jar")
+iceberg_flink_runtime_jar = os.path.join(os.getcwd(), 
"iceberg-flink-runtime-1.16-{{% icebergVersion %}}.jar")
 
 env.add_jars("file://{}".format(iceberg_flink_runtime_jar))
 ```
 
-Once we reached this point, we can then create a `StreamTableEnvironment` and 
execute Flink SQL statements. 
-The below example shows how to create a custom catalog via the Python Table 
API:
+Next, create a `StreamTableEnvironment` and execute Flink SQL statements. The 
below example shows how to create a custom catalog via the Python Table API:
+
 ```python
 from pyflink.table import StreamTableEnvironment
 table_env = StreamTableEnvironment.create(env)
-table_env.execute_sql("CREATE CATALOG my_catalog WITH ("
-                      "'type'='iceberg', "
-                      "'catalog-impl'='com.my.custom.CatalogImpl', "
-                      "'my-additional-catalog-config'='my-value')")
+table_env.execute_sql("""
+CREATE CATALOG my_catalog WITH (
+    'type'='iceberg', 
+    'catalog-impl'='com.my.custom.CatalogImpl',
+    'my-additional-catalog-config'='my-value'
+)
+""")
+```
+
+Run a query:
+
+```python
+(table_env
+    .sql_query("SELECT PULocationID, DOLocationID, passenger_count FROM 
my_catalog.nyc.taxis LIMIT 5")
+    .execute()
+    .print()) 
+```
+
+```
++----+----------------------+----------------------+--------------------------------+
+| op |         PULocationID |         DOLocationID |                
passenger_count |
++----+----------------------+----------------------+--------------------------------+
+| +I |                  249 |                   48 |                           
 1.0 |
+| +I |                  132 |                  233 |                           
 1.0 |
+| +I |                  164 |                  107 |                           
 1.0 |
+| +I |                   90 |                  229 |                           
 1.0 |
+| +I |                  137 |                  249 |                           
 1.0 |
++----+----------------------+----------------------+--------------------------------+
+5 rows in set
 ```
 
-For more details, please refer to the [Python Table 
API](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/table/intro_to_table_api/).
+For more details, please refer to the [Python Table 
API](https://ci.apache.org/projects/flink/flink-docs-release-1.16/docs/dev/python/table/intro_to_table_api/).
 
 ## Creating catalogs and using catalogs.
 
-Flink 1.11 support to create catalogs by using flink sql.
+Flink support to create catalogs by using Flink SQL.
 
 ### Catalog Configuration
 
@@ -220,7 +200,7 @@ The following properties can be set globally and are not 
limited to a specific c
 
 ### Hive catalog
 
-This creates an iceberg catalog named `hive_catalog` that can be configured 
using `'catalog-type'='hive'`, which loads tables from a hive metastore:
+This creates an Iceberg catalog named `hive_catalog` that can be configured 
using `'catalog-type'='hive'`, which loads tables from Hive metastore:
 
 ```sql
 CREATE CATALOG hive_catalog WITH (
@@ -238,7 +218,7 @@ The following properties can be set if using the Hive 
catalog:
 * `uri`: The Hive metastore's thrift URI. (Required)
 * `clients`: The Hive metastore client pool size, default value is 2. 
(Optional)
 * `warehouse`: The Hive warehouse location, users should specify this path if 
neither set the `hive-conf-dir` to specify a location containing a 
`hive-site.xml` configuration file nor add a correct `hive-site.xml` to 
classpath.
-* `hive-conf-dir`: Path to a directory containing a `hive-site.xml` 
configuration file which will be used to provide custom Hive configuration 
values. The value of `hive.metastore.warehouse.dir` from 
`<hive-conf-dir>/hive-site.xml` (or hive configure file from classpath) will be 
overwrote with the `warehouse` value if setting both `hive-conf-dir` and 
`warehouse` when creating iceberg catalog.
+* `hive-conf-dir`: Path to a directory containing a `hive-site.xml` 
configuration file which will be used to provide custom Hive configuration 
values. The value of `hive.metastore.warehouse.dir` from 
`<hive-conf-dir>/hive-site.xml` (or hive configure file from classpath) will be 
overwritten with the `warehouse` value if setting both `hive-conf-dir` and 
`warehouse` when creating iceberg catalog.
 * `hadoop-conf-dir`: Path to a directory containing `core-site.xml` and 
`hdfs-site.xml` configuration files which will be used to provide custom Hadoop 
configuration values. 
 
 ### Hadoop catalog
@@ -258,7 +238,7 @@ The following properties can be set if using the Hadoop 
catalog:
 
 * `warehouse`: The HDFS directory to store metadata files and data files. 
(Required)
 
-We could execute the sql command `USE CATALOG hive_catalog` to set the current 
catalog.
+Execute the sql command `USE CATALOG hadoop_catalog` to set the current 
catalog.
 
 ### REST catalog
 
@@ -280,7 +260,7 @@ The following properties can be set if using the REST 
catalog:
 
 ### Custom catalog
 
-Flink also supports loading a custom Iceberg `Catalog` implementation by 
specifying the `catalog-impl` property. Here is an example:
+Flink also supports loading a custom Iceberg `Catalog` implementation by 
specifying the `catalog-impl` property:
 
 ```sql
 CREATE CATALOG my_catalog WITH (
@@ -292,7 +272,7 @@ CREATE CATALOG my_catalog WITH (
 
 ### Create through YAML config
 
-Catalogs can be registered in `sql-client-defaults.yaml` before starting the 
SQL client. Here is an example:
+Catalogs can be registered in `sql-client-defaults.yaml` before starting the 
SQL client.
 
 ```yaml
 catalogs: 
@@ -304,8 +284,8 @@ catalogs:
 
 ### Create through SQL Files
 
-Since the `sql-client-defaults.yaml` file was removed in flink 1.14, SQL 
Client supports the -i startup option to execute an initialization SQL file to 
setup environment when starting up the SQL Client.
-An example of such a file is presented below.
+The Flink SQL Client supports the `-i` startup option to execute an 
initialization SQL file to set up environment when starting up the SQL Client.
+
 ```sql
 -- define available catalogs
 CREATE CATALOG hive_catalog WITH (
@@ -317,18 +297,18 @@ CREATE CATALOG hive_catalog WITH (
 
 USE CATALOG hive_catalog;
 ```
-using -i <init.sql> option to initialize SQL Client session
+
+Using `-i <init.sql>` option to initialize SQL Client session:
+
 ```bash
 /path/to/bin/sql-client.sh -i /path/to/init.sql
 ```
 
-
-
 ## DDL commands
 
 ### `CREATE DATABASE`
 
-By default, iceberg will use the `default` database in flink. Using the 
following example to create a separate database if we don't want to create 
tables under the `default` database:
+By default, Iceberg will use the `default` database in Flink. Using the 
following example to create a separate database in order to avoid creating 
tables under the `default` database:
 
 ```sql
 CREATE DATABASE iceberg_db;
@@ -344,11 +324,11 @@ CREATE TABLE `hive_catalog`.`default`.`sample` (
 );
 ```
 
-Table create commands support the most commonly used [flink create 
clauses](https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table)
 now, including: 
+Table create commands support the commonly used [Flink create 
clauses](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/)
 including: 
 
-* `PARTITION BY (column1, column2, ...)` to configure partitioning, apache 
flink does not yet support hidden partitioning.
+* `PARTITION BY (column1, column2, ...)` to configure partitioning, Flink does 
not yet support hidden partitioning.
 * `COMMENT 'table document'` to set a table description.
-* `WITH ('key'='value', ...)` to set [table configuration](../configuration) 
which will be stored in apache iceberg table properties.
+* `WITH ('key'='value', ...)` to set [table configuration](../configuration) 
which will be stored in Iceberg table properties.
 
 Currently, it does not support computed column, primary key and watermark 
definition etc.
 
@@ -363,7 +343,7 @@ CREATE TABLE `hive_catalog`.`default`.`sample` (
 ) PARTITIONED BY (data);
 ```
 
-Apache Iceberg support hidden partition but apache flink don't support 
partitioning by a function on columns, so we've no way to support hidden 
partition in flink DDL now, we will improve apache flink DDL in future.
+Iceberg support hidden partition but Flink don't support partitioning by a 
function on columns, so there is no way to support hidden partition in Flink 
DDL.
 
 ### `CREATE TABLE LIKE`
 
@@ -378,12 +358,12 @@ CREATE TABLE `hive_catalog`.`default`.`sample` (
 CREATE TABLE  `hive_catalog`.`default`.`sample_like` LIKE 
`hive_catalog`.`default`.`sample`;
 ```
 
-For more details, refer to the [Flink `CREATE TABLE` 
documentation](https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table).
+For more details, refer to the [Flink `CREATE TABLE` 
documentation](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/create/).
 
 
 ### `ALTER TABLE`
 
-Iceberg only support altering table properties in flink 1.11 now.
+Iceberg only support altering table properties:
 
 ```sql
 ALTER TABLE `hive_catalog`.`default`.`sample` SET 
('write.format.default'='avro')
@@ -405,7 +385,7 @@ DROP TABLE `hive_catalog`.`default`.`sample`;
 
 ## Querying with SQL
 
-Iceberg support both streaming and batch read in flink now. we could execute 
the following sql command to switch the execute type from 'streaming' mode to 
'batch' mode, and vice versa:
+Iceberg support both streaming and batch read in Flink. Execute the following 
sql command to switch execution mode from `streaming` to `batch`, and vice 
versa:
 
 ```sql
 -- Execute the flink job in streaming mode for current session context
@@ -417,12 +397,12 @@ SET execution.runtime-mode = batch;
 
 ### Flink batch read
 
-If want to check all the rows in iceberg table by submitting a flink __batch__ 
job, you could execute the following sentences:
+Submit a Flink __batch__ job using the following sentences:
 
 ```sql
 -- Execute the flink job in batch mode for current session context
 SET execution.runtime-mode = batch;
-SELECT * FROM sample       ;
+SELECT * FROM sample;
 ```
 
 ### Flink streaming read
@@ -443,13 +423,11 @@ SELECT * FROM sample /*+ OPTIONS('streaming'='true', 
'monitor-interval'='1s')*/
 SELECT * FROM sample /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 
'start-snapshot-id'='3821550127947089987')*/ ;
 ```
 
-There are some options that could be set in flink SQL hint options for 
streaming job, see [read options](#Read-options) for details.
+There are some options that could be set in Flink SQL hint options for 
streaming job, see [read options](#Read-options) for details.
 
 ### FLIP-27 source for SQL
 
-Here are the SQL settings for the FLIP-27 source, which is only available
-for Flink 1.14 or above.  All other SQL settings and options
-documented above are applicable to the FLIP-27 source.
+Here are the SQL settings for the 
[FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface)
 source. All other SQL settings and options documented above are applicable to 
the FLIP-27 source.
 
 ```sql
 -- Opt in the FLIP-27 source. Default is false.
@@ -458,11 +436,11 @@ SET table.exec.iceberg.use-flip27-source = true;
 
 ## Writing with SQL
 
-Iceberg support both `INSERT INTO` and `INSERT OVERWRITE` in flink 1.11 now.
+Iceberg support both `INSERT INTO` and `INSERT OVERWRITE`.
 
 ### `INSERT INTO`
 
-To append new data to a table with a flink streaming job, use `INSERT INTO`:
+To append new data to a table with a Flink streaming job, use `INSERT INTO`:
 
 ```sql
 INSERT INTO `hive_catalog`.`default`.`sample` VALUES (1, 'a');
@@ -494,7 +472,7 @@ Iceberg supports `UPSERT` based on the primary key when 
writing data into v2 tab
 
 1. Enable the `UPSERT` mode as table-level property `write.upsert.enabled`. 
Here is an example SQL statement to set the table property when creating a 
table. It would be applied for all write paths to this table (batch or 
streaming) unless overwritten by write options as described later.
 
-```
+```sql
 CREATE TABLE `hive_catalog`.`default`.`sample` (
   `id`  INT UNIQUE COMMENT 'unique id',
   `data` STRING NOT NULL,
@@ -504,7 +482,7 @@ CREATE TABLE `hive_catalog`.`default`.`sample` (
 
 2. Enabling `UPSERT` mode using `upsert-enabled` in the [write 
options](#Write-options) provides more flexibility than a table level config. 
Note that you still need to use v2 table format and specify the primary key 
when creating the table.
 
-```
+```sql
 INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
 ...
 ```
@@ -558,7 +536,7 @@ stream.print();
 env.execute("Test Iceberg Streaming Read");
 ```
 
-There are other options that we could set by Java API, please see the 
[FlinkSource#Builder](../../../javadoc/{{% icebergVersion 
%}}/org/apache/iceberg/flink/source/FlinkSource.html).
+There are other options that can be set, please see the 
[FlinkSource#Builder](../../../javadoc/{{% icebergVersion 
%}}/org/apache/iceberg/flink/source/FlinkSource.html).
 
 ## Reading with DataStream (FLIP-27 source)
 
@@ -568,8 +546,7 @@ streaming source interface. It also unifies the source 
interfaces for both batch
 Most source connectors (like Kafka, file) in Flink repo have  migrated to the 
FLIP-27 interface.
 Flink is planning to deprecate the old `SourceFunction` interface in the near 
future.
 
-A FLIP-27 based Flink `IcebergSource` is added in `iceberg-flink` module for 
Flink 1.14 or above.
-The FLIP-27 `IcebergSource` is currently an experimental feature.
+A FLIP-27 based Flink `IcebergSource` is added in `iceberg-flink` module. The 
FLIP-27 `IcebergSource` is currently an experimental feature.
 
 ### Batch Read
 
@@ -628,7 +605,7 @@ stream.print();
 env.execute("Test Iceberg Streaming Read");
 ```
 
-There are other options that we could set by Java API, please see the 
+There are other options that could be set by Java API, please see the 
 [IcebergSource#Builder](../../../javadoc/{{% icebergVersion 
%}}/org/apache/iceberg/flink/source/IcebergSource.html).
 
 ### Read as Avro GenericRecord
@@ -671,7 +648,7 @@ Iceberg support writing to iceberg table from different 
DataStream input.
 
 ### Appending data.
 
-we have supported writing `DataStream<RowData>` and `DataStream<Row>` to the 
sink iceberg table natively.
+Flink supports writing `DataStream<RowData>` and `DataStream<Row>` to the sink 
iceberg table natively.
 
 ```java
 StreamExecutionEnvironment env = ...;
@@ -687,11 +664,11 @@ FlinkSink.forRowData(input)
 env.execute("Test Iceberg DataStream");
 ```
 
-The iceberg API also allows users to write generic `DataStream<T>` to iceberg 
table, more example could be found in this [unit 
test](https://github.com/apache/iceberg/blob/master/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java).
+The iceberg API also allows users to write generic `DataStream<T>` to iceberg 
table, more example could be found in this [unit 
test](https://github.com/apache/iceberg/blob/master/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java).
 
 ### Overwrite data
 
-To overwrite the data in existing iceberg table dynamically, we could set the 
`overwrite` flag in FlinkSink builder.
+Set the `overwrite` flag in FlinkSink builder to overwrite the data in 
existing iceberg tables:
 
 ```java
 StreamExecutionEnvironment env = ...;
@@ -710,7 +687,7 @@ env.execute("Test Iceberg DataStream");
 
 ### Upsert data
 
-To upsert the data in existing iceberg table, we could set the `upsert` flag 
in FlinkSink builder. The table must use v2 table format and have a primary key.
+Set the `upsert` flag in FlinkSink builder to upsert the data in existing 
iceberg table. The table must use v2 table format and have a primary key.
 
 ```java
 StreamExecutionEnvironment env = ...;
@@ -747,8 +724,10 @@ DataStream<org.apache.avro.generic.GenericRecord> 
dataStream = ...;
 
 Schema icebergSchema = table.schema();
 
-// if the Iceberg table schema contains time fields, we can't use
-// Avro schema converted from Iceberg schema via AvroSchemaUtil.
+
+// The Avro schema converted from Iceberg schema can't be used
+// due to precision difference between how Iceberg schema (micro)
+// and Flink AvroToRowDataConverters (milli) deal with time type.
 // Instead, use the Avro schema defined directly.
 // See AvroGenericRecordToRowDataMapper Javadoc for more details.
 org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(icebergSchema, 
table.name());
@@ -765,7 +744,7 @@ FlinkSink.builderFor(
   .append();
 ```
 
-### monitoring metrics
+### Netrics
 
 The following Flink metrics are provided by the Flink Iceberg sink.
 
@@ -806,14 +785,12 @@ It could happen that Iceberg commits failed (for whatever 
reason), while Flink c
 * It could also happen that `notifyCheckpointComplete` wasn't triggered (for 
whatever bug).
 As a result, there won't be any Iceberg commits attempted.
 
-If the checkpoint interval (and expected Iceberg commit interval) is 5 minutes,
-we can set up alert with rule like `elapsedSecondsSinceLastSuccessfulCommit > 
60 minutes`.
-Then we can detect failed or missing Iceberg commits in 60 minutes.
+If the checkpoint interval (and expected Iceberg commit interval) is 5 
minutes, set up alert with rule like `elapsedSecondsSinceLastSuccessfulCommit > 
60 minutes` to detect failed or missing Iceberg commits in the past hour.
 
 ## Options
 ### Read options
 
-Flink read options are passed when configuring the Flink IcebergSource, like 
this:
+Flink read options are passed when configuring the Flink IcebergSource:
 
 ```
 IcebergSource.forRowData()
@@ -1155,7 +1132,7 @@ Iceberg types are converted to Flink types according to 
the following table:
 
 ## Future improvement.
 
-There are some features that we do not yet support in the current flink 
iceberg integration work:
+There are some features that are do not yet supported in the current Flink 
Iceberg integration work:
 
 * Don't support creating iceberg table with hidden partitioning. 
[Discussion](http://mail-archives.apache.org/mod_mbox/flink-dev/202008.mbox/%3ccabi+2jqco3msoa4+ywaxv5j-z8tgknzdx-pqlyb-dg+dvum...@mail.gmail.com%3e)
 in flink mail list.
 * Don't support creating iceberg table with computed column.

Reply via email to