This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/asf-site by this push:
new 46445fa14aa [HUDI-6965][Docs] Flink Quickstart Revamp (#9897)
46445fa14aa is described below
commit 46445fa14aa4bfc9d364f228278753870e5c21dc
Author: Aditya Goenka <[email protected]>
AuthorDate: Sat Oct 21 08:32:18 2023 +0530
[HUDI-6965][Docs] Flink Quickstart Revamp (#9897)
---
website/docs/flink-quick-start-guide.md | 313 ++++++++++++++++++--------------
1 file changed, 176 insertions(+), 137 deletions(-)
diff --git a/website/docs/flink-quick-start-guide.md
b/website/docs/flink-quick-start-guide.md
index acec412e41f..cfba29ad8db 100644
--- a/website/docs/flink-quick-start-guide.md
+++ b/website/docs/flink-quick-start-guide.md
@@ -7,20 +7,9 @@ import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';
This page introduces Flink-Hudi integration. We can feel the unique charm of
how Flink brings in the power of streaming into Hudi.
-This guide helps you quickly start using Flink on Hudi, and learn different
modes for reading/writing Hudi by Flink:
+This guide helps you quickly start using Flink on Hudi, and learn different
modes for reading/writing Hudi by Flink.
-- **Quick Start** : Read [Quick Start](#quick-start) to get started quickly
Flink sql client to write to(read from) Hudi.
-- **Configuration** : For [Global
Configuration](/docs/next/flink_tuning#global-configurations), sets up through
`$FLINK_HOME/conf/flink-conf.yaml`. For per job configuration, sets up through
[Table Option](/docs/next/flink_tuning#table-options).
-- **Writing Data** : Flink supports different modes for writing, such as [CDC
Ingestion](/docs/hoodie_streaming_ingestion#cdc-ingestion), [Bulk
Insert](/docs/hoodie_streaming_ingestion#bulk-insert), [Index
Bootstrap](/docs/hoodie_streaming_ingestion#index-bootstrap), [Changelog
Mode](/docs/hoodie_streaming_ingestion#changelog-mode) and [Append
Mode](/docs/hoodie_streaming_ingestion#append-mode).
-- **Querying Data** : Flink supports different modes for reading, such as
[Streaming Query](/docs/querying_data#streaming-query) and [Incremental
Query](/docs/querying_data#incremental-query).
-- **Tuning** : For write/read tasks, this guide gives some tuning suggestions,
such as [Memory Optimization](/docs/next/flink_tuning#memory-optimization) and
[Write Rate Limit](/docs/next/flink_tuning#write-rate-limit).
-- **Optimization**: Offline compaction is supported [Offline
Compaction](/docs/compaction#flink-offline-compaction).
-- **Query Engines**: Besides Flink, many other engines are integrated: [Hive
Query](/docs/syncing_metastore#flink-setup), [Presto
Query](/docs/querying_data#prestodb).
-- **Catalog**: A Hudi specific catalog is supported: [Hudi
Catalog](/docs/querying_data/#hudi-catalog).
-
-## Quick Start
-
-### Setup
+## Setup
<Tabs
defaultValue="flinksql"
values={[
@@ -34,27 +23,13 @@ values={[
We use the [Flink Sql
Client](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sqlclient/)
because it's a good
quick start tool for SQL users.
-#### Step.1 download Flink jar
-
-Hudi works with both Flink 1.13, Flink 1.14, Flink 1.15, Flink 1.16 and Flink
1.17. You can follow the
-instructions [here](https://flink.apache.org/downloads) for setting up Flink.
Then choose the desired Hudi-Flink bundle
-jar to work with different Flink and Scala versions:
-
-- `hudi-flink1.13-bundle`
-- `hudi-flink1.14-bundle`
-- `hudi-flink1.15-bundle`
-- `hudi-flink1.16-bundle`
-- `hudi-flink1.17-bundle`
+### Download Flink
-#### Step.2 start Flink cluster
-Start a standalone Flink cluster within hadoop environment.
-Before you start up the cluster, we suggest to config the cluster as follows:
+Hudi works with Flink 1.13, Flink 1.14, Flink 1.15, Flink 1.16 and Flink 1.17.
You can follow the
+instructions [here](https://flink.apache.org/downloads) for setting up Flink.
-- in `$FLINK_HOME/conf/flink-conf.yaml`, add config option
`taskmanager.numberOfTaskSlots: 4`
-- in `$FLINK_HOME/conf/flink-conf.yaml`, [add other global configurations
according to the characteristics of your
task](/docs/next/flink_tuning#global-configurations)
-- in `$FLINK_HOME/conf/workers`, add item `localhost` as 4 lines so that there
are 4 workers on the local cluster
-
-Now starts the cluster:
+### Start Flink cluster
+Start a standalone Flink cluster within hadoop environment. In case we are
trying on local setup, then we could download hadoop binaries and set
HADOOP_HOME.
```bash
# HADOOP_HOME is your hadoop root directory after unpack the binary package.
@@ -63,21 +38,34 @@ export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
# Start the Flink standalone cluster
./bin/start-cluster.sh
```
-#### Step.3 start Flink SQL client
+### Start Flink SQL client
Hudi supports packaged bundle jar for Flink, which should be loaded in the
Flink SQL Client when it starts up.
You can build the jar manually under path
`hudi-source-dir/packaging/hudi-flink-bundle`(see [Build Flink Bundle
Jar](/docs/syncing_metastore#install)), or download it from the
[Apache Official
Repository](https://repo.maven.apache.org/maven2/org/apache/hudi/).
-Now starts the SQL CLI:
+Now start the SQL CLI:
```bash
-# HADOOP_HOME is your hadoop root directory after unpack the binary package.
-export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
-
-./bin/sql-client.sh embedded -j .../hudi-flink1.1*-bundle-*.*.*.jar shell
+# For Flink versions: 1.13 - 1.17
+export FLINK_VERSION=1.17
+export HUDI_VERSION=0.14.0
+wget
https://repo1.maven.org/maven2/org/apache/hudi/hudi-flink${FLINK_VERSION}-bundle/${HUDI_VERSION}/hudi-flink${FLINK_VERSION}-bundle-${HUDI_VERSION}.jar
-P $FLINK_HOME/lib/
+./bin/sql-client.sh embedded -j
lib/hudi-flink${FLINK_VERSION}-bundle-${HUDI_VERSION}.jar shell
```
+
+### Flink Support Matrix
+
+
+| Hudi | Supported Flink version |
+|:----------------|:----------------------------------------|
+| 0.14.x | 1.13.x, 1.14.x, 1.15.x, 1.16.x, 1.17.x |
+| 0.13.x | 1.13.x, 1.14.x, 1.15.x, 1.16.x |
+| 0.12.x | 1.13.x, 1.14.x, 1.15.x |
+| 0.11.x | 1.13.x, 1.14.x |
+
+
<div className="notice--info">
<h4>Please note the following: </h4>
<ul>
@@ -95,47 +83,16 @@ The SQL CLI only executes the SQL line by line.
Hudi works with Flink 1.13, Flink 1.14, Flink 1.15, Flink 1.16 and Flink 1.17.
Please add the desired
dependency to your project:
```xml
-<!-- Flink 1.13 -->
-<dependency>
- <groupId>org.apache.hudi</groupId>
- <artifactId>hudi-flink1.13-bundle</artifactId>
- <version>0.14.0</version>
-</dependency>
-```
-
-```xml
-<!-- Flink 1.14 -->
-<dependency>
- <groupId>org.apache.hudi</groupId>
- <artifactId>hudi-flink1.14-bundle</artifactId>
- <version>0.14.0</version>
-</dependency>
-```
-
-```xml
-<!-- Flink 1.15 -->
-<dependency>
- <groupId>org.apache.hudi</groupId>
- <artifactId>hudi-flink1.15-bundle</artifactId>
- <version>0.14.0</version>
-</dependency>
-```
-
-```xml
-<!-- Flink 1.16 -->
-<dependency>
- <groupId>org.apache.hudi</groupId>
- <artifactId>hudi-flink1.16-bundle</artifactId>
- <version>0.14.0</version>
-</dependency>
-```
-
-```xml
-<!-- Flink 1.17 -->
+<!-- For Flink versions 1.13 - 1.17-->
+<properties>
+ <flink.version>1.17.0</flink.version>
+ <flink.binary.version>1.17</flink.binary.version>
+ <hudi.version>0.14.0</hudi.version>
+</properties>
<dependency>
<groupId>org.apache.hudi</groupId>
- <artifactId>hudi-flink1.17-bundle</artifactId>
- <version>0.14.0</version>
+ <artifactId>hudi-flink${flink.binary.version}-bundle</artifactId>
+ <version>${hudi.version}</version>
</dependency>
```
@@ -144,7 +101,9 @@ dependency to your project:
</Tabs
>
-### Insert Data
+## Create Table
+
+First, let's create a Hudi table. Here, we use a partitioned table for
illustration, but Hudi also supports non-partitioned tables.
<Tabs
defaultValue="flinksql"
@@ -156,36 +115,69 @@ values={[
<TabItem value="flinksql">
-Creates a Flink Hudi table first and insert data into the Hudi table using SQL
`VALUES` as below.
+Here is an example of creating a flink Hudi table.
```sql
-- sets up the result mode to tableau to show the results directly in the CLI
set sql-client.execution.result-mode = tableau;
-
-CREATE TABLE t1(
- uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
- name VARCHAR(10),
- age INT,
- ts TIMESTAMP(3),
- `partition` VARCHAR(20)
+DROP TABLE hudi_table;
+CREATE TABLE hudi_table(
+ ts BIGINT,
+ uuid VARCHAR(40) PRIMARY KEY NOT ENFORCED,
+ rider VARCHAR(20),
+ driver VARCHAR(20),
+ fare DOUBLE,
+ city VARCHAR(20)
)
-PARTITIONED BY (`partition`)
+PARTITIONED BY (`city`)
WITH (
'connector' = 'hudi',
- 'path' = '${path}',
- 'table.type' = 'MERGE_ON_READ' -- this creates a MERGE_ON_READ table, by
default is COPY_ON_WRITE
+ 'path' = 'file:///tmp/hudi_table',
+ 'table.type' = 'MERGE_ON_READ'
);
+```
+
+</TabItem>
+
+<TabItem value="dataStream">
+
+```java
+// Java
+// First commit will auto-initialize the table, if it did not exist in the
specified base path.
+```
+
+</TabItem>
+
+</Tabs
+>
+
+## Insert Data
+
+<Tabs
+defaultValue="flinksql"
+values={[
+{ label: 'Flink SQL', value: 'flinksql', },
+{ label: 'DataStream API', value: 'dataStream', },
+]}
+>
+
+<TabItem value="flinksql">
+
+Creates a Flink Hudi table first and insert data into the Hudi table using SQL
`VALUES` as below.
+
+```sql
-- insert data using values
-INSERT INTO t1 VALUES
- ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
- ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
- ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
- ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
- ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
- ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
- ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
- ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
+INSERT INTO hudi_table
+VALUES
+(1695159649087,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'),
+(1695091554788,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',27.70
,'san_francisco'),
+(1695046462179,'9909a8b1-2d15-4d3d-8ec9-efc48c536a00','rider-D','driver-L',33.90
,'san_francisco'),
+(1695332066204,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O',93.50,'san_francisco'),
+(1695516137016,'e3cf430c-889d-4015-bc98-59bdce1e530c','rider-F','driver-P',34.15,'sao_paulo'),
+(1695376420876,'7a84095f-737f-40bc-b62f-6b69664712d2','rider-G','driver-Q',43.40
,'sao_paulo'),
+(1695173887231,'3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04','rider-I','driver-S',41.06
,'chennai'),
+(1695115999911,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',17.85,'chennai');
```
</TabItem>
@@ -202,8 +194,8 @@ import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.util.HoodiePipeline;
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
-String targetTable = "t1";
-String basePath = "file:///tmp/t1";
+String targetTable = "hudi_table";
+String basePath = "file:///tmp/hudi_table";
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.PATH.key(), basePath);
@@ -223,13 +215,15 @@ HoodiePipeline.Builder builder =
HoodiePipeline.builder(targetTable)
builder.sink(dataStream, false); // The second parameter indicating whether
the input data stream is bounded
env.execute("Api_Sink");
+
+// Full Quickstart Example -
https://gist.github.com/ad1happy2go/1716e2e8aef6dcfe620792d6e6d86d36
```
</TabItem>
</Tabs
>
-### Query Data
+## Query Data
<Tabs
defaultValue="flinksql"
@@ -243,7 +237,7 @@ values={[
```sql
-- query from the Hudi table
-select * from t1;
+select * from hudi_table;
```
</TabItem>
@@ -258,14 +252,14 @@ import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.util.HoodiePipeline;
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
-String targetTable = "t1";
-String basePath = "file:///tmp/t1";
+String targetTable = "hudi_table";
+String basePath = "file:///tmp/hudi_table";
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.PATH.key(), basePath);
options.put(FlinkOptions.TABLE_TYPE.key(),
HoodieTableType.MERGE_ON_READ.name());
options.put(FlinkOptions.READ_AS_STREAMING.key(), "true"); // this option
enable the streaming read
-options.put(FlinkOptions.READ_START_COMMIT.key(), "'20210316134557'"); //
specifies the start commit instant time
+options.put(FlinkOptions.READ_START_COMMIT.key(), "20210316134557"); //
specifies the start commit instant time
HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable)
.column("uuid VARCHAR(20)")
@@ -289,19 +283,13 @@ env.execute("Api_Source");
This statement queries snapshot view of the dataset.
Refers to [Table types and queries](/docs/concepts#table-types--queries) for
more info on all table types and query types supported.
-### Update Data
+## Update Data
This is similar to inserting new data.
```sql
--- this would update the record with primary key 'id1'
--- if the operation is defined as UPSERT
-insert into t1 values
- ('id1','Danny',27,TIMESTAMP '1970-01-01 00:00:01','par1');
-
--- this would update the specific records with constant age 19,
--- NOTE: only works for batch sql queries
-UPDATE t1 SET age=19 WHERE uuid in ('id1', 'id2');
+SET 'execution.runtime-mode' = 'batch';
+UPDATE hudi_table SET fare = 25.0 WHERE uuid =
'334e26e9-8355-45cc-97c6-c31daf0df330';
```
Notice that the save mode is now `Append`. In general, always use append mode
unless you are trying to create the table for the first time.
@@ -313,17 +301,33 @@ The `UPDATE` statement is supported since Flink 1.17, so
only Hudi Flink bundle
Only **batch** queries on Hudi table with primary key work correctly.
:::
-### Streaming Query
+## Delete Data {#deletes}
-Hudi Flink also provides capability to obtain a stream of records that changed
since given commit timestamp.
-This can be achieved using Hudi's streaming querying and providing a start
time from which changes need to be streamed.
-We do not need to specify endTime, if we want all changes after the given
commit (as is the common case).
+### Row-level Delete
+When consuming data in streaming query, Hudi Flink source can also accept the
change logs from the upstream data source if the `RowKind` is set up per-row,
+it can then apply the UPDATE and DELETE in row level. You can then sync a
NEAR-REAL-TIME snapshot on Hudi for all kinds
+of RDBMS.
+
+### Batch Delete
+
+```sql
+-- delete all the records with age greater than 23
+-- NOTE: only works for batch sql queries
+SET 'execution.runtime-mode' = 'batch';
+DELETE FROM t1 WHERE age > 23;
+```
:::note
-The bundle jar with **hive profile** is needed for streaming query, by default
the officially released flink bundle is built **without**
-**hive profile**, the jar needs to be built manually, see [Build Flink Bundle
Jar](/docs/syncing_metastore#install) for more details.
+The `DELETE` statement is supported since Flink 1.17, so only Hudi Flink
bundle compiled with Flink 1.17+ supplies this functionality.
+Only **batch** queries on Hudi table with primary key work correctly.
:::
+## Streaming Query
+
+Hudi Flink also provides capability to obtain a stream of records that changed
since given commit timestamp.
+This can be achieved using Hudi's streaming querying and providing a start
time from which changes need to be streamed.
+We do not need to specify endTime, if we want all changes after the given
commit (as is the common case).
+
```sql
CREATE TABLE t1(
uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
@@ -346,31 +350,66 @@ WITH (
select * from t1;
```
-This will give all changes that happened after the `read.start-commit` commit.
The unique thing about this
-feature is that it now lets you author streaming pipelines on streaming or
batch data source.
-### Delete Data {#deletes}
+:::info Key requirements
+The bundle jar with **hive profile** is needed for streaming query, by default
the officially released flink bundle is built **without**
+**hive profile**, the jar needs to be built manually, see [Build Flink Bundle
Jar](/docs/syncing_metastore#install) for more details.
+:::
-#### Row-level Delete
-When consuming data in streaming query, Hudi Flink source can also accept the
change logs from the upstream data source if the `RowKind` is set up per-row,
-it can then apply the UPDATE and DELETE in row level. You can then sync a
NEAR-REAL-TIME snapshot on Hudi for all kinds
-of RDBMS.
+## Change Data Capture Query
-#### Batch Delete
+Hudi Flink also provides capability to obtain a stream of records with Change
Data Capture.
+CDC queries are useful for applications that need to obtain all the changes,
along with before/after images of records.
```sql
--- delete all the records with age greater than 23
--- NOTE: only works for batch sql queries
-DELETE FROM t1 WHERE age > 23;
-```
+set sql-client.execution.result-mode = tableau;
-:::note
-The `DELETE` statement is supported since Flink 1.17, so only Hudi Flink
bundle compiled with Flink 1.17+ supplies this functionality.
-Only **batch** queries on Hudi table with primary key work correctly.
-:::
+CREATE TABLE hudi_table(
+ ts BIGINT,
+ uuid VARCHAR(40) PRIMARY KEY NOT ENFORCED,
+ rider VARCHAR(20),
+ driver VARCHAR(20),
+ fare DOUBLE,
+ city VARCHAR(20)
+)
+PARTITIONED BY (`city`)
+WITH (
+ 'connector' = 'hudi',
+ 'path' = 'file:///tmp/hudi_table',
+ 'table.type' = 'MERGE_ON_READ',
+ 'changelog.enabled' = 'true', -- this option enable the change log enabled
+ 'cdc.enabled' = 'true' -- this option enable the cdc log enabled
+);
+-- insert data using values
+INSERT INTO hudi_table
+VALUES
+(1695159649087,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'),
+(1695091554788,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',27.70
,'san_francisco'),
+(1695046462179,'9909a8b1-2d15-4d3d-8ec9-efc48c536a00','rider-D','driver-L',33.90
,'san_francisco'),
+(1695332066204,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O',93.50,'san_francisco'),
+(1695516137016,'e3cf430c-889d-4015-bc98-59bdce1e530c','rider-F','driver-P',34.15,'sao_paulo'),
+(1695376420876,'7a84095f-737f-40bc-b62f-6b69664712d2','rider-G','driver-Q',43.40
,'sao_paulo'),
+(1695173887231,'3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04','rider-I','driver-S',41.06
,'chennai'),
+(1695115999911,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',17.85,'chennai');
+SET 'execution.runtime-mode' = 'batch';
+UPDATE hudi_table SET fare = 25.0 WHERE uuid =
'334e26e9-8355-45cc-97c6-c31daf0df330';
+-- Query the table in stream mode in another shell to see change logs
+SET 'execution.runtime-mode' = 'streaming';
+select * from hudi_table/*+ OPTIONS('read.streaming.enabled'='true')*/;
+```
+
+This will give all changes that happened after the `read.start-commit` commit.
The unique thing about this
+feature is that it now lets you author streaming pipelines on streaming or
batch data source.
## Where To Go From Here?
-Check out the [Flink Setup](/docs/next/flink_tuning) how-to page for deeper
dive into configuration settings.
+- **Quick Start** : Read [Quick Start](#quick-start) to get started quickly
Flink sql client to write to(read from) Hudi.
+- **Configuration** : For [Global
Configuration](/docs/next/flink_tuning#global-configurations), sets up through
`$FLINK_HOME/conf/flink-conf.yaml`. For per job configuration, sets up through
[Table Option](/docs/next/flink_tuning#table-options).
+- **Writing Data** : Flink supports different modes for writing, such as [CDC
Ingestion](/docs/hoodie_streaming_ingestion#cdc-ingestion), [Bulk
Insert](/docs/hoodie_streaming_ingestion#bulk-insert), [Index
Bootstrap](/docs/hoodie_streaming_ingestion#index-bootstrap), [Changelog
Mode](/docs/hoodie_streaming_ingestion#changelog-mode) and [Append
Mode](/docs/hoodie_streaming_ingestion#append-mode).
+- **Querying Data** : Flink supports different modes for reading, such as
[Streaming Query](/docs/querying_data#streaming-query) and [Incremental
Query](/docs/querying_data#incremental-query).
+- **Tuning** : For write/read tasks, this guide gives some tuning suggestions,
such as [Memory Optimization](/docs/next/flink_tuning#memory-optimization) and
[Write Rate Limit](/docs/next/flink_tuning#write-rate-limit).
+- **Optimization**: Offline compaction is supported [Offline
Compaction](/docs/compaction#flink-offline-compaction).
+- **Query Engines**: Besides Flink, many other engines are integrated: [Hive
Query](/docs/syncing_metastore#flink-setup), [Presto
Query](/docs/querying_data#prestodb).
+- **Catalog**: A Hudi specific catalog is supported: [Hudi
Catalog](/docs/querying_data/#hudi-catalog).
If you are relatively new to Apache Hudi, it is important to be familiar with
a few core concepts:
- [Hudi Timeline](/docs/next/timeline) – How Hudi manages transactions and
other table services