This is an automated email from the ASF dual-hosted git repository.
bhavanisudha pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/asf-site by this push:
new 2017bbe218e [Docs] Flink quickstart and sql website updates (#9952)
2017bbe218e is described below
commit 2017bbe218edcea0846e1ad3c6bd745c15b763c9
Author: Aditya Goenka <[email protected]>
AuthorDate: Fri Dec 1 06:20:31 2023 +0530
[Docs] Flink quickstart and sql website updates (#9952)
* Updated Flink Datastream Code and SQL
* Done minor updates
* Updated table name at couple of places
* Updated Flink DML page
* Resolved few review comments
* Taken care of review comments
* Minor fixes
---------
Co-authored-by: Bhavani Sudha Saktheeswaran
<[email protected]>
---
website/docs/flink-quick-start-guide.md | 141 ++++++++++++++++++++++----------
website/docs/sql_ddl.md | 108 ++++++++++++++++++++++--
website/docs/sql_dml.md | 72 +++++++++++++++-
website/docs/sql_queries.md | 10 ++-
website/docs/writing_data.md | 16 +++-
5 files changed, 290 insertions(+), 57 deletions(-)
diff --git a/website/docs/flink-quick-start-guide.md
b/website/docs/flink-quick-start-guide.md
index 2198959dbd5..48f48cd4176 100644
--- a/website/docs/flink-quick-start-guide.md
+++ b/website/docs/flink-quick-start-guide.md
@@ -7,29 +7,26 @@ import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';
This page introduces Flink-Hudi integration. We can feel the unique charm of
how Flink brings in the power of streaming into Hudi.
-This guide helps you quickly start using Flink on Hudi, and learn different
modes for reading/writing Hudi by Flink.
## Setup
-<Tabs
-defaultValue="flinksql"
-values={[
-{ label: 'Flink SQL', value: 'flinksql', },
-{ label: 'DataStream API', value: 'dataStream', },
-]}
->
-<TabItem value="flinksql">
-We use the [Flink Sql
Client](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sqlclient/)
because it's a good
-quick start tool for SQL users.
+### Flink Support Matrix
-### Download Flink
-Hudi works with Flink 1.13, Flink 1.14, Flink 1.15, Flink 1.16 and Flink 1.17.
You can follow the
-instructions [here](https://flink.apache.org/downloads) for setting up Flink.
+| Hudi | Supported Flink version |
+|:----------------|:----------------------------------------|
+| 0.14.x | 1.13.x, 1.14.x, 1.15.x, 1.16.x, 1.17.x |
+| 0.13.x | 1.13.x, 1.14.x, 1.15.x, 1.16.x |
+| 0.12.x | 1.13.x, 1.14.x, 1.15.x |
+| 0.11.x | 1.13.x, 1.14.x |
+
-### Start Flink cluster
-Start a standalone Flink cluster within hadoop environment. In case we are
trying on local setup, then we could download hadoop binaries and set
HADOOP_HOME.
+### Download Flink and Start Flink cluster
+
+Hudi works with Flink 1.13, Flink 1.14, Flink 1.15, Flink 1.16 and Flink 1.17.
You can follow the
+instructions [here](https://flink.apache.org/downloads) for setting up Flink.
Then, start a standalone Flink cluster
+within hadoop environment. In case we are trying on local setup, then we could
download hadoop binaries and set HADOOP_HOME.
```bash
# HADOOP_HOME is your hadoop root directory after unpack the binary package.
@@ -38,6 +35,26 @@ export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
# Start the Flink standalone cluster
./bin/start-cluster.sh
```
+<div className="notice--info">
+ <h4>Please note the following: </h4>
+<ul>
+ <li>We suggest hadoop 2.9.x+ version because some of the object storage has
filesystem implementation only after that</li>
+ <li>The flink-parquet and flink-avro formats are already packaged into the
hudi-flink-bundle jar</li>
+</ul>
+</div>
+
+<Tabs
+defaultValue="flinksql"
+values={[
+{ label: 'Flink SQL', value: 'flinksql', },
+{ label: 'DataStream API', value: 'dataStream', },
+]}
+>
+<TabItem value="flinksql">
+
+We use the [Flink Sql
Client](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sqlclient/)
because it's a good
+quick start tool for SQL users.
+
### Start Flink SQL client
Hudi supports packaged bundle jar for Flink, which should be loaded in the
Flink SQL Client when it starts up.
@@ -54,26 +71,6 @@ wget
https://repo1.maven.org/maven2/org/apache/hudi/hudi-flink${FLINK_VERSION}-b
./bin/sql-client.sh embedded -j
lib/hudi-flink${FLINK_VERSION}-bundle-${HUDI_VERSION}.jar shell
```
-
-### Flink Support Matrix
-
-
-| Hudi | Supported Flink version |
-|:----------------|:----------------------------------------|
-| 0.14.x | 1.13.x, 1.14.x, 1.15.x, 1.16.x, 1.17.x |
-| 0.13.x | 1.13.x, 1.14.x, 1.15.x, 1.16.x |
-| 0.12.x | 1.13.x, 1.14.x, 1.15.x |
-| 0.11.x | 1.13.x, 1.14.x |
-
-
-<div className="notice--info">
- <h4>Please note the following: </h4>
-<ul>
- <li>We suggest hadoop 2.9.x+ version because some of the object storage has
filesystem implementation only after that</li>
- <li>The flink-parquet and flink-avro formats are already packaged into the
hudi-flink-bundle jar</li>
-</ul>
-</div>
-
Setup table name, base path and operate using SQL for this guide.
The SQL CLI only executes the SQL line by line.
</TabItem>
@@ -164,7 +161,7 @@ values={[
<TabItem value="flinksql">
-Creates a Flink Hudi table first and insert data into the Hudi table using SQL
`VALUES` as below.
+Insert data into the Hudi table using SQL `VALUES`.
```sql
-- insert data using values
@@ -183,7 +180,7 @@ VALUES
<TabItem value="dataStream">
-Creates a Flink Hudi table first and insert data into the Hudi table using
DataStream API as below.
+Add some streaming source to flink and load the data in hudi table. Since,
this is the first write, it will also auto-create the table.
```java
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -215,9 +212,9 @@ HoodiePipeline.Builder builder =
HoodiePipeline.builder(targetTable)
builder.sink(dataStream, false); // The second parameter indicating whether
the input data stream is bounded
env.execute("Api_Sink");
-
-// Full Quickstart Example -
https://gist.github.com/ad1happy2go/1716e2e8aef6dcfe620792d6e6d86d36
```
+Refer Full Quickstart Example
[here](https://github.com/ad1happy2go/hudi-examples/blob/main/flink/src/main/java/com/hudi/flink/quickstart/HudiDataStreamWriter.java)
+
</TabItem>
</Tabs
@@ -275,6 +272,8 @@ DataStream<RowData> rowDataDataStream = builder.source(env);
rowDataDataStream.print();
env.execute("Api_Source");
```
+Refer Full Streaming Reader Example
[here](https://github.com/ad1happy2go/hudi-examples/blob/main/flink/src/main/java/com/hudi/flink/quickstart/HudiDataStreamReader.java)
+
</TabItem>
</Tabs
@@ -287,23 +286,60 @@ Refers to [Table types and
queries](/docs/concepts#table-types--queries) for mor
This is similar to inserting new data.
+<Tabs
+defaultValue="flinksql"
+values={[
+{ label: 'Flink SQL', value: 'flinksql', },
+{ label: 'DataStream API', value: 'dataStream', },
+]}
+>
+
+<TabItem value="flinksql">
+
+Hudi tables can be updated by either inserting reocrds with same primary key
or using a standard UPDATE statement shown as below.
+
```sql
+-- Update Queries only works with batch execution mode
SET 'execution.runtime-mode' = 'batch';
UPDATE hudi_table SET fare = 25.0 WHERE uuid =
'334e26e9-8355-45cc-97c6-c31daf0df330';
```
-Notice that the save mode is now `Append`. In general, always use append mode
unless you are trying to create the table for the first time.
-[Querying](#query-data) the data again will now show updated records. Each
write operation generates a new [commit](/docs/concepts)
-denoted by the timestamp. Look for changes in `_hoodie_commit_time`, `age`
fields for the same `_hoodie_record_key`s in previous commit.
-
:::note
The `UPDATE` statement is supported since Flink 1.17, so only Hudi Flink
bundle compiled with Flink 1.17+ supplies this functionality.
-Only **batch** queries on Hudi table with primary key work correctly.
+Only **batch** queries on Hudi table with primary key work correctly.
:::
+</TabItem>
+
+<TabItem value="dataStream">
+
+Add some streaming source to flink and load the data in hudi table using
DataStream API as [above](#insert-data).
+When new rows with the same primary key arrive in stream, then it will be be
updated.
+In the insert example incoming row with same record id will be updated.
+
+Refer Update Example
[here](https://github.com/ad1happy2go/hudi-examples/blob/main/flink/src/main/java/com/hudi/flink/quickstart/HudiDataStreamWriter.java)
+
+</TabItem>
+
+</Tabs
+>
+
+[Querying](#query-data) the data again will now show updated records. Each
write operation generates a new [commit](/docs/concepts)
+denoted by the timestamp.
+
## Delete Data {#deletes}
+<Tabs
+defaultValue="flinksql"
+values={[
+{ label: 'Flink SQL', value: 'flinksql', },
+{ label: 'DataStream API', value: 'dataStream', },
+]}
+>
+
+<TabItem value="flinksql">
### Row-level Delete
+
When consuming data in streaming query, Hudi Flink source can also accept the
change logs from the upstream data source if the `RowKind` is set up per-row,
it can then apply the UPDATE and DELETE in row level. You can then sync a
NEAR-REAL-TIME snapshot on Hudi for all kinds
of RDBMS.
@@ -322,6 +358,21 @@ The `DELETE` statement is supported since Flink 1.17, so
only Hudi Flink bundle
Only **batch** queries on Hudi table with primary key work correctly.
:::
+</TabItem>
+
+<TabItem value="dataStream">
+
+Creates a Flink Hudi table first and insert data into the Hudi table using
DataStream API as below.
+When new rows with the same primary key and Row Kind as Delete arrive in
stream, then it will be be deleted.
+
+Refer Delete Example
[here](https://github.com/ad1happy2go/hudi-examples/blob/main/flink/src/main/java/com/hudi/flink/quickstart/HudiDataStreamWriter.java)
+
+</TabItem
+>
+
+</Tabs
+>
+
## Streaming Query
Hudi Flink also provides capability to obtain a stream of records that changed
since given commit timestamp.
diff --git a/website/docs/sql_ddl.md b/website/docs/sql_ddl.md
index e3d4ad3419f..5b5fafe3962 100644
--- a/website/docs/sql_ddl.md
+++ b/website/docs/sql_ddl.md
@@ -522,18 +522,73 @@ CREATE CATALOG hoodie_catalog
### Create Table
-The following is an example of creating a Flink table. Read the [Flink Quick
Start](/docs/flink-quick-start-guide) guide for more examples.
+You can create tables using standard FLINK SQL CREATE TABLE syntax, which
supports partitioning and passing Flink options using WITH.
+
+```sql
+CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
+ (
+ { <physical_column_definition>
+ [ <table_constraint> ][ , ...n]
+ )
+ [COMMENT table_comment]
+ [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
+ WITH (key1=val1, key2=val2, ...)
+```
+
+### Create non-partitioned table
+
+Creating a non-partitioned table is as simple as creating a regular table.
+
+```sql
+-- create a Hudi table
+CREATE TABLE hudi_table(
+ id BIGINT,
+ name STRING,
+ price DOUBLE
+)
+WITH (
+'connector' = 'hudi',
+'path' = 'file:///tmp/hudi_table',
+'table.type' = 'MERGE_ON_READ'
+);
+```
+
+### Create partitioned table
+
+The following is an example of creating a Flink partitioned table.
```sql
-CREATE TABLE hudi_table2(
- id int,
- name string,
- price double
+CREATE TABLE hudi_table(
+ id BIGINT,
+ name STRING,
+ dt STRING,
+ hh STRING
+)
+PARTITIONED BY (`dt`)
+WITH (
+'connector' = 'hudi',
+'path' = 'file:///tmp/hudi_table',
+'table.type' = 'MERGE_ON_READ'
+);
+```
+
+### Create table with record keys and ordering fields
+
+The following is an example of creating a Flink table with record key and
ordering field similarly to spark.
+
+```sql
+CREATE TABLE hudi_table(
+ id BIGINT PRIMARY KEY NOT ENFORCED,
+ name STRING,
+ price DOUBLE,
+ ts BIGINT
)
+PARTITIONED BY (`dt`)
WITH (
'connector' = 'hudi',
-'path' = 's3://bucket-name/hudi/',
-'table.type' = 'MERGE_ON_READ' -- this creates a MERGE_ON_READ table, default
is COPY_ON_WRITE
+'path' = 'file:///tmp/hudi_table',
+'table.type' = 'MERGE_ON_READ',
+'precombine.field' = 'ts'
);
```
@@ -542,6 +597,45 @@ WITH (
ALTER TABLE tableA RENAME TO tableB;
```
+### Setting Hudi configs
+
+#### Using table options
+You can configure hoodie configs in table options when creating a table. You
can refer Flink specific hoodie configs
[here](/docs/next/configurations#FLINK_SQL)
+These configs will be applied to all the operations on that table.
+
+```sql
+CREATE TABLE IF NOT EXISTS tableName (
+ colName1 colType1 PRIMARY KEY NOT ENFORCED,
+ colName2 colType2,
+ ...
+)
+WITH (
+ 'connector' = 'hudi',
+ 'path' = '${path}',
+ ${hoodie.config.key1} = '${hoodie.config.value1}',
+ ${hoodie.config.key2} = '${hoodie.config.value2}',
+ ....
+);
+
+e.g.
+CREATE TABLE hudi_table(
+ id BIGINT PRIMARY KEY NOT ENFORCED,
+ name STRING,
+ price DOUBLE,
+ ts BIGINT
+)
+PARTITIONED BY (`dt`)
+WITH (
+'connector' = 'hudi',
+'path' = 'file:///tmp/hudi_table',
+'table.type' = 'MERGE_ON_READ',
+'precombine.field' = 'ts',
+'hoodie.cleaner.fileversions.retained' = '20',
+'hoodie.keep.max.commits' = '20',
+'hoodie.datasource.write.hive_style_partitioning' = 'true'
+);
+```
+
## Supported Types
| Spark | Hudi | Notes |
diff --git a/website/docs/sql_dml.md b/website/docs/sql_dml.md
index 5ab27094343..10219843564 100644
--- a/website/docs/sql_dml.md
+++ b/website/docs/sql_dml.md
@@ -160,7 +160,7 @@ For a Hudi table with user configured primary keys, the
join condition in `Merge
For a Table where Hudi auto generates primary keys, the join condition in MIT
can be on any arbitrary data columns.
:::
-### Delete Data
+### Delete From
You can remove data from a Hudi table using the `DELETE FROM` statement.
@@ -199,7 +199,73 @@ You can control the behavior of these operations using
various configuration opt
## Flink
-Flink SQL also provides several Data Manipulation Language (DML) actions for
interacting with Hudi tables. All these operations are already
-showcased in the [Flink Quickstart](/docs/flink-quick-start-guide).
+Flink SQL provides several Data Manipulation Language (DML) actions for
interacting with Hudi tables. These operations allow you to insert, update and
delete data from your Hudi tables. Let's explore them one by one.
+### Insert Into
+
+You can utilize the INSERT INTO statement to incorporate data into a Hudi
table using Flink SQL. Here are a few illustrative examples:
+
+```sql
+INSERT INTO <table>
+SELECT <columns> FROM <source>;
+```
+
+Examples:
+
+```sql
+-- Insert into a Hudi table
+INSERT INTO hudi_table SELECT 1, 'a1', 20;
+```
+
+If the `write.operation` is 'upsert,' the INSERT INTO statement will not only
insert new records but also update existing rows with the same record key.
+
+```sql
+-- Insert into a Hudi table in upsert mode
+INSERT INTO hudi_table/*+ OPTIONS('write.operation'='upsert')*/ SELECT 1,
'a1', 20;
+```
+
+### Update
+With Flink SQL, you can use update command to update the hudi table. Here are
a few illustrative examples:
+
+```sql
+UPDATE tableIdentifier SET column = EXPRESSION(,column = EXPRESSION) [ WHERE
boolExpression]
+```
+
+```sql
+UPDATE hudi_table SET price = price * 2, ts = 1111 WHERE id = 1;
+```
+:::note Key requirements
+Update query only work with batch excution mode.
+:::
+
+### Delete From
+With Flink SQL, you can use delete command to delete the rows from hudi table.
Here are a few illustrative examples:
+
+```sql
+DELETE FROM tableIdentifier [ WHERE boolExpression ]
+```
+
+```sql
+DELETE FROM hudi_table WHERE price < 100;
+```
+
+
+```sql
+DELETE FROM hudi_table WHERE price < 100;
+```
+
+:::note Key requirements
+Delete query only work with batch excution mode.
+:::
+
+### Setting Writer/Reader Configs
+With Flink SQL, you can additionally set the writer/reader writer configs
along with the query.
+
+```sql
+INSERT INTO hudi_table/*+
OPTIONS('${hoodie.config.key1}'='${hoodie.config.value1}')*/
+```
+
+```sql
+INSERT INTO hudi_table/*+ OPTIONS('hoodie.keep.max.commits'='true')*/
+```
diff --git a/website/docs/sql_queries.md b/website/docs/sql_queries.md
index b7992d5ef6d..47c507c9bfb 100644
--- a/website/docs/sql_queries.md
+++ b/website/docs/sql_queries.md
@@ -102,6 +102,10 @@ relying on the custom Hudi input formats like Hive.
Typically, notebook users an
By default, Flink SQL will try to use its optimized native readers (for e.g.
reading parquet files) instead of Hive SerDes.
Additionally, partition pruning is applied by Flink if a partition predicate
is specified in the filter. Filters push down may not be supported yet (please
check Flink roadmap).
+```sql
+select * from hudi_table/*+ OPTIONS('metadata.enabled'='true',
'read.data.skipping.enabled'='false','hoodie.metadata.index.column.stats.enable'='true')*/;
+```
+
#### Options
| Option Name | Required | Default | Remarks |
| ----------- | ------- | ------- | ------- |
@@ -116,7 +120,7 @@ mode by setting option `read.streaming.enabled` as `true`.
Sets up option `read.
value as `earliest` if you want to consume all the history data set.
```sql
--- Show an example query.
+select * from hudi_table/*+ OPTIONS('read.streaming.enabled'='true',
'read.start-commit'='earliest')*/;
```
#### Options
@@ -142,6 +146,10 @@ There are 3 use cases for incremental query:
the interval is a closed one: both start commit and end commit are
inclusive;
3. Time Travel: consume as batch for an instant time, specify the
`read.end-commit` is enough because the start commit is latest by default.
+```sql
+select * from hudi_table/*+ OPTIONS('read.start-commit'='earliest',
'read.end-commit'='20231122155636355')*/;
+```
+
#### Options
| Option Name | Required | Default | Remarks |
| ----------- | ------- | ------- | ------- |
diff --git a/website/docs/writing_data.md b/website/docs/writing_data.md
index 522259fab9e..01de87fe660 100644
--- a/website/docs/writing_data.md
+++ b/website/docs/writing_data.md
@@ -9,7 +9,15 @@ import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';
In this section, we will cover ways to ingest new changes from external
sources or even other Hudi tables.
-The two main tools available are the [Hudi
Streamer](/docs/hoodie_streaming_ingestion#hudi-streamer) tool, as well as the
[Spark Hudi datasource](#spark-datasource-writer).
+Currently Hudi supports following ways to write the data.
+- [Hudi Streamer](/docs/hoodie_streaming_ingestion#hudi-streamer)
+- [Spark Hudi Datasource](#spark-datasource-writer)
+- [Spark Structured
Streaming](/docs/hoodie_streaming_ingestion#structured-streaming)
+- [Spark SQL](/docs/next/sql_ddl#spark-sql)
+- [Flink Writer](/docs/next/hoodie_streaming_ingestion#flink-ingestion)
+- [Flink SQL](/docs/next/sql_ddl#flink)
+- [Java Writer](#java-writer)
+- [Kafka Connect](/docs/next/hoodie_streaming_ingestion#kafka-connect-sink)
## Spark Datasource Writer
@@ -534,6 +542,7 @@ INSERT INTO hudi_table select ... from ...;
**Note**: INSERT OVERWRITE is not supported yet but already on the roadmap.
+
### Non-Blocking Concurrency Control (Experimental)
Hudi Flink supports a new non-blocking concurrency control mode, where
multiple writer tasks can be executed
@@ -609,3 +618,8 @@ to `NON_BLOCKING_CONCURRENCY_CONTROL`. The `write.tasks`
option is used to speci
be used for writing to the table. The `compaction.schedule.enabled`,
`compaction.async.enabled`
and `clean.async.enabled` options are used to disable the compaction and
cleaning services for the second pipeline.
This is done to ensure that the compaction and cleaning services are not
executed twice for the same table.
+
+
+## Java Writer
+We can use plain java to write to hudi tables. To use Java client we can
refere
[here](https://github.com/apache/hudi/blob/master/hudi-examples/hudi-examples-java/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java)
+