This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/asf-site by this push:
new 72b01a53d3d [HUDI-6330][DOCS] Update user doc to show how to use
consistent bucket index for Flink engine (#10977)
72b01a53d3d is described below
commit 72b01a53d3d22a51e9210b4b69f368e7388821e4
Author: Jing Zhang <[email protected]>
AuthorDate: Tue Apr 9 00:18:41 2024 +0800
[HUDI-6330][DOCS] Update user doc to show how to use consistent bucket
index for Flink engine (#10977)
---
website/docs/sql_dml.md | 80 ++++++++++++++++++++++++++++++++++++--
website/releases/release-0.14.0.md | 4 +-
2 files changed, 78 insertions(+), 6 deletions(-)
diff --git a/website/docs/sql_dml.md b/website/docs/sql_dml.md
index 90576dcb0e0..edb63730b13 100644
--- a/website/docs/sql_dml.md
+++ b/website/docs/sql_dml.md
@@ -323,12 +323,15 @@ In the below example, we have two streaming ingestion
pipelines that concurrentl
pipeline is responsible for the compaction and cleaning table services, while
the other pipeline is just for data
ingestion.
-```sql
+In order to commit the dataset, the checkpoint needs to be enabled, here is an
example configuration for a flink-conf.yaml:
+```yaml
-- set the interval as 30 seconds
execution.checkpointing.interval: 30000
state.backend: rocksdb
+```
--- This is a datagen source that can generates records continuously
+```sql
+-- This is a datagen source that can generate records continuously
CREATE TABLE sourceT (
uuid varchar(20),
name varchar(10),
@@ -349,7 +352,7 @@ CREATE TABLE t1(
`partition` varchar(20)
) WITH (
'connector' = 'hudi',
- 'path' = '/Users/chenyuzhao/workspace/hudi-demo/t1',
+ 'path' = '${work_path}/hudi-demo/t1',
'table.type' = 'MERGE_ON_READ',
'index.type' = 'BUCKET',
'hoodie.write.concurrency.mode' = 'NON_BLOCKING_CONCURRENCY_CONTROL',
@@ -365,7 +368,7 @@ CREATE TABLE t1_2(
`partition` varchar(20)
) WITH (
'connector' = 'hudi',
- 'path' = '/Users/chenyuzhao/workspace/hudi-demo/t1',
+ 'path' = '${work_path}/hudi-demo/t1',
'table.type' = 'MERGE_ON_READ',
'index.type' = 'BUCKET',
'hoodie.write.concurrency.mode' = 'NON_BLOCKING_CONCURRENCY_CONTROL',
@@ -390,3 +393,72 @@ and `clean.async.enabled` options are used to disable the
compaction and cleanin
This is done to ensure that the compaction and cleaning services are not
executed twice for the same table.
+### Consistent hashing index (Experimental)
+
+We have introduced the Consistent Hashing Index since [0.13.0
release](/releases/release-0.13.0#consistent-hashing-index). In comparison to
the static hashing index ([Bucket
Index](/releases/release-0.11.0#bucket-index)), the consistent hashing index
offers dynamic scalability of data buckets for the writer.
+You can find the
[RFC](https://github.com/apache/hudi/blob/master/rfc/rfc-42/rfc-42.md) for the
design of this feature.
+In the 0.13.X release, the Consistent Hashing Index is supported only for
Spark engine. And since [release
0.14.0](/releases/release-0.14.0#consistent-hashing-index-support), the index
is supported for Flink engine.
+
+To utilize this feature, configure the option `index.type` as `BUCKET` and set
`hoodie.index.bucket.engine` to `CONSISTENT_HASHING`.
+When enabling the consistent hashing index, it's important to enable
clustering scheduling within the writer. During this process, the writer will
perform dual writes for both the old and new data buckets while the clustering
is pending. Although the dual write does not impact correctness, it is strongly
recommended to execute clustering as quickly as possible.
+
+In the below example, we will create a datagen source and do streaming
ingestion into Hudi table with consistent bucket index. In order to commit the
dataset, the checkpoint needs to be enabled, here is an example configuration
for a flink-conf.yaml:
+```yaml
+-- set the interval as 30 seconds
+execution.checkpointing.interval: 30000
+state.backend: rocksdb
+```
+
+```sql
+-- This is a datagen source that can generate records continuously
+CREATE TABLE sourceT (
+ uuid varchar(20),
+ name varchar(10),
+ age int,
+ ts timestamp(3),
+ `partition` as 'par1'
+) WITH (
+ 'connector' = 'datagen',
+ 'rows-per-second' = '200'
+);
+
+-- Create the hudi table with consistent bucket index
+CREATE TABLE t1(
+ uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
+ name VARCHAR(10),
+ age INT,
+ ts TIMESTAMP(3),
+ `partition` VARCHAR(20)
+)
+PARTITIONED BY (`partition`)
+WITH (
+ 'connector'='hudi',
+ 'path' = '${work_path}/hudi-demo/hudiT',
+ 'table.type' = 'MERGE_ON_READ',
+ 'index.type' = 'BUCKET',
+ 'clustering.schedule.enabled'='true',
+ 'hoodie.index.bucket.engine'='CONSISTENT_HASHING',
+
'hoodie.clustering.plan.strategy.class'='org.apache.hudi.client.clustering.plan.strategy.FlinkConsistentBucketClusteringPlanStrategy',
+
'hoodie.clustering.execution.strategy.class'='org.apache.hudi.client.clustering.run.strategy.SparkConsistentBucketClusteringExecutionStrategy',
+ 'hoodie.bucket.index.num.buckets'='8',
+ 'hoodie.bucket.index.max.num.buckets'='128',
+ 'hoodie.bucket.index.min.num.buckets'='8',
+ 'hoodie.bucket.index.split.threshold'='1.5',
+ 'write.tasks'='2'
+);
+
+-- submit the pipelines
+insert into t1 select * from sourceT;
+
+select * from t1 limit 20;
+```
+
+:::caution
+Consistent Hashing Index is supported for Flink engine since [release
0.14.0](/releases/release-0.14.0#consistent-hashing-index-support) and
currently there are some limitations to use it as of 0.14.0:
+
+- This index is supported only for MOR table. This limitation also exists even
if using Spark engine.
+- It does not work with metadata table enabled. This limitation also exists
even if using Spark engine.
+- Consistent hashing index does not work with bulk-insert using Flink engine
yet, please use simple bucket index or Spark engine for bulk-insert pipelines.
+- The resize plan which generated by Flink engine only supports merging small
file groups, the file splitting is not supported yet.
+- The resize plan should be executed through an offline Spark job. Flink
engine does not support execute resize plan yet.
+ :::
\ No newline at end of file
diff --git a/website/releases/release-0.14.0.md
b/website/releases/release-0.14.0.md
index 266db7c0c2b..71b8cc9a934 100644
--- a/website/releases/release-0.14.0.md
+++ b/website/releases/release-0.14.0.md
@@ -289,8 +289,8 @@ In comparison to the static hashing index (BUCKET index),
the consistent hashing
data buckets for the writer. To utilize this feature, configure the option
`index.type` as `BUCKET` and set
`hoodie.index.bucket.engine` to `CONSISTENT_HASHING`.
-When enabling the consistent hashing index, it's important to activate
asynchronous clustering scheduling within the writer.
-The clustering plan should be executed through an offline job. During this
process, the writer will perform dual writes
+When enabling the consistent hashing index, it's important to activate
clustering scheduling within the writer.
+The clustering plan should be executed through an offline Spark job. During
this process, the writer will perform dual writes
for both the old and new data buckets while the clustering is pending.
Although the dual write does not impact correctness,
it is strongly recommended to execute clustering as quickly as possible.