This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/asf-site by this push:
new bc28ca24b9d [HUDI-7089] Add docs for new features in 1.0.0-beta
(#10087)
bc28ca24b9d is described below
commit bc28ca24b9d7c859638f015816b3cd8cf81a670a
Author: Sagar Sumit <[email protected]>
AuthorDate: Wed Nov 15 12:28:27 2023 +0530
[HUDI-7089] Add docs for new features in 1.0.0-beta (#10087)
---
website/docs/concurrency_control.md | 27 +++++-
website/docs/flink-quick-start-guide.md | 2 +-
website/docs/sql_ddl.md | 146 +++++++++++++++++++++++++++++-
website/docs/timeline.md | 55 +++++++++--
website/docs/writing_data.md | 78 +++++++++++++++-
website/static/assets/images/lsm_tree.png | Bin 0 -> 108638 bytes
6 files changed, 291 insertions(+), 17 deletions(-)
diff --git a/website/docs/concurrency_control.md
b/website/docs/concurrency_control.md
index 750a9fee1ff..54d6408320d 100644
--- a/website/docs/concurrency_control.md
+++ b/website/docs/concurrency_control.md
@@ -10,6 +10,13 @@ Concurrency control defines how different writers/readers
coordinate access to t
In this section, we will discuss the different concurrency controls supported
by Hudi and how they are leveraged to provide flexible deployment models; we
will cover multi-writing, a popular deployment model; finally, we’ll describe
ways to ingest data into a Hudi Table from multiple writers using different
writers, like Hudi Streamer, Hudi datasource, Spark Structured Streaming and
Spark SQL.
+:::note
+Hudi has introduced a new concurrency mode `NON_BLOCKING_CONCURRENCY_CONTROL`,
where unlike OCC, multiple writers can
+operate on the table with non-blocking conflict resolution. The writers can
write into the same file group with the
+conflicts resolved automatically by the query reader and the compactor. The
new concurrency mode is currently
+available for preview in version 1.0.0-beta only. You can read more about it
under section [Model C: Multi-writer](#model-c-multi-writer).
+:::
+
## Deployment models with supported concurrency controls
@@ -61,6 +68,16 @@ With multiple writers using OCC, these are the write
guarantees to expect:
- *INCREMENTAL PULL Guarantee*: Data consumption and checkpoints are NEVER out
of order. If there are inflight commits
(due to multi-writing), incremental queries will not expose the completed
commits following the inflight commits.
+#### Non-Blocking Concurrency Control Mode (Experimental)
+
+`NON_BLOCKING_CONCURRENCY_CONTROL`, offers the same set of guarantees as
mentioned in the case of OCC but without
+explicit locks for serializing the writes. Lock is only needed for writing the
commit metadata to the Hudi timeline. The
+completion time for the commits reflects the serialization order and file
slicing is done based on completion time.
+Multiple writers can operate on the table with non-blocking conflict
resolution. The writers can write into the same
+file group with the conflicts resolved automatically by the query reader and
the compactor. The new concurrency mode is
+currently available for preview in version 1.0.0-beta only with the caveat
that conflict resolution is not supported yet
+between clustering and ingestion. It works for compaction and ingestion, and
we can see an example of that with Flink
+writers [here](/docs/next/writing_data#non-blocking-concurrency-control).
## Enabling Multi Writing
@@ -72,11 +89,11 @@ hoodie.write.lock.provider=<lock-provider-classname>
hoodie.cleaner.policy.failed.writes=LAZY
```
-| Config Name | Default
| Description
[...]
-|-------------------------------------|-------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
-| hoodie.write.concurrency.mode | SINGLE_WRITER (Optional)
| <u>[Concurrency
modes](https://github.com/apache/hudi/blob/c387f2a6dd3dc9db2cd22ec550a289d3a122e487/hudi-common/src/main/java/org/apache/hudi/common/model/WriteConcurrencyMode.java)</u>
for write operations.<br />Possible values:<br /><ul><li>`SINGLE_WRITER`: Only
one active writer to the table. Maximizes
throughput.</li><li>`OPTIMISTIC_CONCURRENCY_CONTROL`: Multiple wr [...]
-| hoodie.write.lock.provider |
org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider (Optional) |
Lock provider class name, user can provide their own implementation of
LockProvider which should be subclass of
org.apache.hudi.common.lock.LockProvider<br /><br />`Config Param:
LOCK_PROVIDER_CLASS_NAME`<br />`Since Version: 0.8.0`
[...]
-| hoodie.cleaner.policy.failed.writes | EAGER (Optional)
|
org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy: Policy that
controls how to clean up failed writes. Hudi will delete any files written by
failed writes to re-claim space. EAGER(default): Clean failed writes inline
after every write operation. LAZY: Clean failed writes lazily after
heartbeat timeout when the cleaning service runs. This policy is re [...]
+| Config Name | Default
| Description
[...]
+|-------------------------------------|-------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
+| hoodie.write.concurrency.mode | SINGLE_WRITER (Optional)
| <u>[Concurrency
modes](https://github.com/apache/hudi/blob/00ece7bce0a4a8d0019721a28049723821e01842/hudi-common/src/main/java/org/apache/hudi/common/model/WriteConcurrencyMode.java)</u>
for write operations.<br />Possible values:<br /><ul><li>`SINGLE_WRITER`: Only
one active writer to the table. Maximizes
throughput.</li><li>`OPTIMISTIC_CONCURRENCY_CONTROL`: Multiple wr [...]
+| hoodie.write.lock.provider |
org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider (Optional) |
Lock provider class name, user can provide their own implementation of
LockProvider which should be subclass of
org.apache.hudi.common.lock.LockProvider<br /><br />`Config Param:
LOCK_PROVIDER_CLASS_NAME`<br />`Since Version: 0.8.0`
[...]
+| hoodie.cleaner.policy.failed.writes | EAGER (Optional)
|
org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy: Policy that
controls how to clean up failed writes. Hudi will delete any files written by
failed writes to re-claim space. EAGER(default): Clean failed writes inline
after every write operation. LAZY: Clean failed writes lazily after
heartbeat timeout when the cleaning service runs. This policy is re [...]
### External Locking and lock providers
diff --git a/website/docs/flink-quick-start-guide.md
b/website/docs/flink-quick-start-guide.md
index cfba29ad8db..e67e16c5816 100644
--- a/website/docs/flink-quick-start-guide.md
+++ b/website/docs/flink-quick-start-guide.md
@@ -404,7 +404,7 @@ feature is that it now lets you author streaming pipelines
on streaming or batch
## Where To Go From Here?
- **Quick Start** : Read [Quick Start](#quick-start) to get started quickly
Flink sql client to write to(read from) Hudi.
- **Configuration** : For [Global
Configuration](/docs/next/flink_tuning#global-configurations), sets up through
`$FLINK_HOME/conf/flink-conf.yaml`. For per job configuration, sets up through
[Table Option](/docs/next/flink_tuning#table-options).
-- **Writing Data** : Flink supports different modes for writing, such as [CDC
Ingestion](/docs/hoodie_streaming_ingestion#cdc-ingestion), [Bulk
Insert](/docs/hoodie_streaming_ingestion#bulk-insert), [Index
Bootstrap](/docs/hoodie_streaming_ingestion#index-bootstrap), [Changelog
Mode](/docs/hoodie_streaming_ingestion#changelog-mode) and [Append
Mode](/docs/hoodie_streaming_ingestion#append-mode).
+- **Writing Data** : Flink supports different modes for writing, such as [CDC
Ingestion](/docs/hoodie_streaming_ingestion#cdc-ingestion), [Bulk
Insert](/docs/hoodie_streaming_ingestion#bulk-insert), [Index
Bootstrap](/docs/hoodie_streaming_ingestion#index-bootstrap), [Changelog
Mode](/docs/hoodie_streaming_ingestion#changelog-mode) and [Append
Mode](/docs/hoodie_streaming_ingestion#append-mode). Flink also supports
multiple streaming writers with [non-blocking concurrency control](/docs/ [...]
- **Querying Data** : Flink supports different modes for reading, such as
[Streaming Query](/docs/querying_data#streaming-query) and [Incremental
Query](/docs/querying_data#incremental-query).
- **Tuning** : For write/read tasks, this guide gives some tuning suggestions,
such as [Memory Optimization](/docs/next/flink_tuning#memory-optimization) and
[Write Rate Limit](/docs/next/flink_tuning#write-rate-limit).
- **Optimization**: Offline compaction is supported [Offline
Compaction](/docs/compaction#flink-offline-compaction).
diff --git a/website/docs/sql_ddl.md b/website/docs/sql_ddl.md
index d50b874d606..e3d4ad3419f 100644
--- a/website/docs/sql_ddl.md
+++ b/website/docs/sql_ddl.md
@@ -178,6 +178,144 @@ TBLPROPERTIES (
AS SELECT * FROM parquet_table;
```
+### Create Index (Experimental)
+
+Hudi supports creating and dropping indexes, including functional indexes, on
a table.
+
+:::note
+Creating indexes through SQL is in preview in version 1.0.0-beta only. It will
be generally available in version 1.0.0.
+Please report any issues you find either via [GitHub
issues](https://github.com/apache/hudi/issues) or creating a
[JIRA](https://issues.apache.org/jira/projects/HUDI/issues).
+:::
+
+```sql
+-- Create Index
+CREATE INDEX [IF NOT EXISTS] index_name ON [TABLE] table_name
+[USING index_type]
+(column_name1 [OPTIONS(key1=value1, key2=value2, ...)], column_name2
[OPTIONS(key1=value1, key2=value2, ...)], ...)
+[OPTIONS (key1=value1, key2=value2, ...)]
+
+-- Drop Index
+DROP INDEX [IF EXISTS] index_name ON [TABLE] table_name
+```
+
+- `index_name` is the name of the index to be created or dropped.
+- `table_name` is the name of the table on which the index is created or
dropped.
+- `index_type` is the type of the index to be created. Currently, only
`files`, `column_stats` and `bloom_filters` is supported.
+- `column_name` is the name of the column on which the index is created.
+- Both index and column on which the index is created can be qualified with
some options in the form of key-value pairs.
+ We will see this with an example of functional index below.
+
+#### Create Functional Index
+
+A [functional
index](https://github.com/apache/hudi/blob/00ece7bce0a4a8d0019721a28049723821e01842/rfc/rfc-63/rfc-63.md)
+is an index on a function of a column. It is a new addition to Hudi's
[multi-modal
indexing](https://hudi.apache.org/blog/2022/05/17/Introducing-Multi-Modal-Index-for-the-Lakehouse-in-Apache-Hudi)
+subsystem which provides faster access method and also absorb partitioning as
part of the indexing system. Let us see
+some examples of creating a functional index.
+
+```sql
+-- Create a functional index on the column `ts` (unix epoch) of the table
`hudi_table` using the function `from_unixtime` with the format `yyyy-MM-dd`
+CREATE INDEX IF NOT EXISTS ts_datestr ON hudi_table USING column_stats(ts)
OPTIONS(func='from_unixtime', format='yyyy-MM-dd');
+-- Create a functional index on the column `ts` (timestamp in yyyy-MM-dd
HH:mm:ss) of the table `hudi_table` using the function `hour`
+CREATE INDEX ts_hour ON hudi_table USING column_stats(ts) options(func='hour');
+```
+
+Few things to note:
+- The `func` option is required for creating functional index, and it should
be a valid Spark SQL function. Currently,
+ only the functions that take a single column as input are supported. Some
useful functions that are supported are listed below.
+ - `identity`
+ - `from_unixtime`
+ - `date_format`
+ - `to_date`
+ - `to_timestamp`
+ - `year`
+ - `month`
+ - `day`
+ - `hour`
+ - `lower`
+ - `upper`
+ - `substring`
+ - `regexp_extract`
+ - `regexp_replace`
+ - `concat`
+ - `length`
+- Please check the syntax for the above functions in
+ the [Spark SQL
documentation](https://spark.apache.org/docs/latest/sql-ref-functions.html) and
provide the options
+ accordingly. For example, the `format` option is required for
`from_unixtime` function.
+- UDFs are not supported.
+
+<details>
+ <summary>Example of creating and using functional index</summary>
+
+```sql
+-- create a Hudi table
+CREATE TABLE hudi_table_func_index (
+ ts STRING,
+ uuid STRING,
+ rider STRING,
+ driver STRING,
+ fare DOUBLE,
+ city STRING
+) USING HUDI
+tblproperties (primaryKey = 'uuid')
+PARTITIONED BY (city)
+location 'file:///tmp/hudi_table_func_index';
+
+-- disable small file handling so the each insert creates new file --
+set hoodie.parquet.small.file.limit=0;
+
+INSERT INTO hudi_table_func_index VALUES ('2023-09-20
03:58:59','334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco');
+INSERT INTO hudi_table_func_index VALUES ('2023-09-19
08:46:34','e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',27.70
,'san_francisco');
+INSERT INTO hudi_table_func_index VALUES ('2023-09-18
17:45:31','9909a8b1-2d15-4d3d-8ec9-efc48c536a00','rider-D','driver-L',33.90
,'san_francisco');
+INSERT INTO hudi_table_func_index VALUES ('2023-09-22
13:12:56','1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O',93.50,'san_francisco');
+INSERT INTO hudi_table_func_index VALUES ('2023-09-24
06:15:45','e3cf430c-889d-4015-bc98-59bdce1e530c','rider-F','driver-P',34.15,'sao_paulo');
+INSERT INTO hudi_table_func_index VALUES ('2023-09-22
15:21:36','7a84095f-737f-40bc-b62f-6b69664712d2','rider-G','driver-Q',43.40
,'sao_paulo');
+INSERT INTO hudi_table_func_index VALUES ('2023-09-20
12:35:45','3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04','rider-I','driver-S',41.06
,'chennai');
+INSERT INTO hudi_table_func_index VALUES ('2023-09-19
05:34:56','c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',17.85,'chennai');
+
+-- Query with hour function filter but no idex yet --
+spark-sql> SELECT city, fare, rider, driver FROM hudi_table_func_index WHERE
city NOT IN ('chennai') AND hour(ts) > 12;
+san_francisco 93.5 rider-E driver-O
+san_francisco 33.9 rider-D driver-L
+sao_paulo 43.4 rider-G driver-Q
+Time taken: 0.208 seconds, Fetched 3 row(s)
+
+spark-sql> EXPLAIN COST SELECT city, fare, rider, driver FROM
hudi_table_func_index WHERE city NOT IN ('chennai') AND hour(ts) > 12;
+== Optimized Logical Plan ==
+Project [city#3465, fare#3464, rider#3462, driver#3463],
Statistics(sizeInBytes=899.5 KiB)
++- Filter ((isnotnull(city#3465) AND isnotnull(ts#3460)) AND (NOT (city#3465 =
chennai) AND (hour(cast(ts#3460 as timestamp), Some(Asia/Kolkata)) > 12))),
Statistics(sizeInBytes=2.5 MiB)
+ +- Relation
default.hudi_table_func_index[_hoodie_commit_time#3455,_hoodie_commit_seqno#3456,_hoodie_record_key#3457,_hoodie_partition_path#3458,_hoodie_file_name#3459,ts#3460,uuid#3461,rider#3462,driver#3463,fare#3464,city#3465]
parquet, Statistics(sizeInBytes=2.5 MiB)
+
+== Physical Plan ==
+*(1) Project [city#3465, fare#3464, rider#3462, driver#3463]
++- *(1) Filter (isnotnull(ts#3460) AND (hour(cast(ts#3460 as timestamp),
Some(Asia/Kolkata)) > 12))
+ +- *(1) ColumnarToRow
+ +- FileScan parquet
default.hudi_table_func_index[ts#3460,rider#3462,driver#3463,fare#3464,city#3465]
Batched: true, DataFilters: [isnotnull(ts#3460), (hour(cast(ts#3460 as
timestamp), Some(Asia/Kolkata)) > 12)], Format: Parquet, Location:
HoodieFileIndex(1 paths)[file:/tmp/hudi_table_func_index], PartitionFilters:
[isnotnull(city#3465), NOT (city#3465 = chennai)], PushedFilters:
[IsNotNull(ts)], ReadSchema:
struct<ts:string,rider:string,driver:string,fare:double>
+
+
+-- create the functional index --
+CREATE INDEX ts_hour ON hudi_table_func_index USING column_stats(ts)
options(func='hour');
+
+-- query after creating the index --
+spark-sql> SELECT city, fare, rider, driver FROM hudi_table_func_index WHERE
city NOT IN ('chennai') AND hour(ts) > 12;
+san_francisco 93.5 rider-E driver-O
+san_francisco 33.9 rider-D driver-L
+sao_paulo 43.4 rider-G driver-Q
+Time taken: 0.202 seconds, Fetched 3 row(s)
+spark-sql> EXPLAIN COST SELECT city, fare, rider, driver FROM
hudi_table_func_index WHERE city NOT IN ('chennai') AND hour(ts) > 12;
+== Optimized Logical Plan ==
+Project [city#2970, fare#2969, rider#2967, driver#2968],
Statistics(sizeInBytes=449.8 KiB)
++- Filter ((isnotnull(city#2970) AND isnotnull(ts#2965)) AND (NOT (city#2970 =
chennai) AND (hour(cast(ts#2965 as timestamp), Some(Asia/Kolkata)) > 12))),
Statistics(sizeInBytes=1278.3 KiB)
+ +- Relation
default.hudi_table_func_index[_hoodie_commit_time#2960,_hoodie_commit_seqno#2961,_hoodie_record_key#2962,_hoodie_partition_path#2963,_hoodie_file_name#2964,ts#2965,uuid#2966,rider#2967,driver#2968,fare#2969,city#2970]
parquet, Statistics(sizeInBytes=1278.3 KiB)
+
+== Physical Plan ==
+*(1) Project [city#2970, fare#2969, rider#2967, driver#2968]
++- *(1) Filter (isnotnull(ts#2965) AND (hour(cast(ts#2965 as timestamp),
Some(Asia/Kolkata)) > 12))
+ +- *(1) ColumnarToRow
+ +- FileScan parquet
default.hudi_table_func_index[ts#2965,rider#2967,driver#2968,fare#2969,city#2970]
Batched: true, DataFilters: [isnotnull(ts#2965), (hour(cast(ts#2965 as
timestamp), Some(Asia/Kolkata)) > 12)], Format: Parquet, Location:
HoodieFileIndex(1 paths)[file:/tmp/hudi_table_func_index], PartitionFilters:
[isnotnull(city#2970), NOT (city#2970 = chennai)], PushedFilters:
[IsNotNull(ts)], ReadSchema:
struct<ts:string,rider:string,driver:string,fare:double>
+
+```
+</details>
+
### Setting Hudi configs
There are different ways you can pass the configs for a given hudi table.
@@ -237,8 +375,12 @@ Users can set table properties while creating a table. The
important table prope
:::
#### Passing Lock Providers for Concurrent Writers
-Hudi requires a lock provider to support concurrent writers or asynchronous
table services. Users can pass these table
-properties into *TBLPROPERTIES* as well. Below is an example for a Zookeeper
based configuration.
+
+Hudi requires a lock provider to support concurrent writers or asynchronous
table services when using OCC
+and
[NBCC](/docs/next/concurrency_control#non-blocking-concurrency-control-mode-experimental)
(Non-Blocking Concurrency Control)
+concurrency mode. For NBCC mode, locking is only used to write the commit
metadata file in the timeline. Writes are
+serialized by completion time. Users can pass these table properties into
*TBLPROPERTIES* as well. Below is an example
+for a Zookeeper based configuration.
```sql
-- Properties to use Lock configurations to support Multi Writers
diff --git a/website/docs/timeline.md b/website/docs/timeline.md
index 10c749bb994..4ac64ef3a95 100644
--- a/website/docs/timeline.md
+++ b/website/docs/timeline.md
@@ -34,6 +34,11 @@ in one of the following states
* `INFLIGHT` - Denotes that the action is currently being performed
* `COMPLETED` - Denotes completion of an action on the timeline
+All the actions in requested/inflight states are stored in the active timeline
as files named *
+*_<begin\_instant\_time>.<action\_type>.<requested|inflight>_**. Completed
actions are stored along with a time that
+denotes when the action was completed, in a file named *
+*_<begin\_instant\_time>\_<completion\_instant\_time>.<action\_type>._**
+
<figure>
<img className="docimage"
src={require("/assets/images/hudi_timeline.png").default}
alt="hudi_timeline.png" />
</figure>
@@ -54,26 +59,60 @@ after certain thresholds the archival kicks in to move
older timeline events to
timeline is never contacted for regular operations of the table and is merely
used for book-keeping and debugging purposes.
Any instants seen under “.hoodie” directory refers to active timeline and
those archived goes into “.hoodie/archived” folder.
+#### LSM Timeline
+
+As mentioned above, active timeline has limited log history to be fast, while
archived timeline is expensive to access
+during reads or writes, especially with high write throughput. To overcome
this limitation, Hudi introduced the LSM (
+log-structured merge) tree based timeline. Completed actions, their plans and
completion metadata are stored in a more
+scalable LSM tree based archived timeline organized in an **_archived_**
storage location under the `.hoodie` metadata
+path. The new timeline format is enabled by default and going forward, we will
refer to the archived timeline as LSM
+timeline. It consists of Apache Parquet files with action instant data and
bookkeeping metadata files, in the following
+manner.
+
+```bash
+/.hoodie/archived/
+├── _version_ <-- stores the
manifest version that is current
+├── manifest_1 <-- manifests store list of files
in timeline
+├── manifest_2 <-- compactions, cleaning, writes
produce new manifest files
+├── ...
+├── manifest_<N> <-- there can be many manifest
files at any given time
+├── <min_time>_<max_time>_<level>.parquet <-- files storing actual action
details
+```
+
+One can read more about the details of LSM timeline in Hudi 1.0 specs. To
understand it better, here is an example.
+
+<figure>
+ <img className="docimage"
src={require("/assets/images/lsm_tree.png").default} alt="lsm_tree.png" />
+</figure>
+
+In the above figure, each level is a tree sorted by instant times. We can see
that for a bunch of commits the metadata
+is stored in a parquet file. As and when more commits are accumulated, they
get compacted and pushed down to lower level
+of the tree. Each new operation to the timeline yields a new snapshot version.
The advantage of such a structure is that
+we can keep the top level in memory, and still load the remaining levels
efficiently from the disk if we need to walk
+back longer history. The LSM timeline compaction frequency is controlled
by`hoodie.timeline.compaction.batch.size` i.e.
+for every _N_ parquet files in the current level, they are merged and flush as
a compacted file in the next level.
### Archival Configs
Basic configurations that control archival.
#### Spark write client configs
-| Config Name
| Default | Description
|
-|--------------------------------------------------------------------------------------------|
------------- |
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
-| hoodie.keep.max.commits | 30 (Optional) | Archiving service moves older
entries from timeline into an archived log after each write, to keep the
metadata overhead constant, even as the table size grows. This config controls
the maximum number of instants to retain in the active timeline. |
-| hoodie.keep.min.commits | 20 (Optional) | Similar to
hoodie.keep.max.commits, but controls the minimum number of instants to retain
in the active timeline.
|
+| Config Name | Default | Description
|
+|---------------------------------------|---------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.keep.max.commits | 30 (Optional) | Archiving service
moves older entries from timeline into an archived log after each write, to
keep the metadata overhead constant, even as the table size grows. This config
controls the maximum number of instants to retain in the active timeline. |
+| hoodie.keep.min.commits | 20 (Optional) | Similar to
hoodie.keep.max.commits, but controls the minimum number of instants to retain
in the active timeline.
|
+| hoodie.timeline.compaction.batch.size | 10 (Optional) | Controls the number
of parquet files to compact in a single compaction run at the current level of
the LSM tree.
|
For more advanced configs refer
[here](https://hudi.apache.org/docs/next/configurations#Archival-Configs-advanced-configs).
#### Flink Options
Flink jobs using the SQL can be configured through the options in WITH clause.
The actual datasource level configs are listed below.
-| Config Name
| Default | Description
|
-|
----------------------------------------------------------------------------------------------
| --------------------------------------- |
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
-| archive.max_commits |
50 (Optional) | Max number of commits to keep before
archiving older commits into a sequential log, default 50<br /><br /> `Config
Param: ARCHIVE_MAX_COMMITS`
|
-| archive.min_commits |
40 (Optional) | Min number of commits to keep before
archiving older commits into a sequential log, default 40<br /><br /> `Config
Param: ARCHIVE_MIN_COMMITS`
|
+| Config Name | Default | Description
|
+|---------------------------------------|---------------|------------------------------------------------------------------------------------------------------------------------------------------------|
+| archive.max_commits | 50 (Optional) | Max number of
commits to keep before archiving older commits into a sequential log, default
50<br /><br /> `Config Param: ARCHIVE_MAX_COMMITS` |
+| archive.min_commits | 40 (Optional) | Min number of
commits to keep before archiving older commits into a sequential log, default
40<br /><br /> `Config Param: ARCHIVE_MIN_COMMITS` |
+| hoodie.timeline.compaction.batch.size | 10 (Optional) | Controls the number
of parquet files to compact in a single compaction run at the current level of
the LSM tree. |
Refer [here](https://hudi.apache.org/docs/next/configurations#Flink-Options)
for more details.
diff --git a/website/docs/writing_data.md b/website/docs/writing_data.md
index a4bc19fd372..522259fab9e 100644
--- a/website/docs/writing_data.md
+++ b/website/docs/writing_data.md
@@ -532,4 +532,80 @@ You can write the data using the SQL `INSERT INTO`
statements:
INSERT INTO hudi_table select ... from ...;
```
-**Note**: INSERT OVERWRITE is not supported yet but already on the roadmap.
\ No newline at end of file
+**Note**: INSERT OVERWRITE is not supported yet but already on the roadmap.
+
+### Non-Blocking Concurrency Control (Experimental)
+
+Hudi Flink supports a new non-blocking concurrency control mode, where
multiple writer tasks can be executed
+concurrently without blocking each other. One can read more about this mode in
+the [concurrency control](/docs/next/concurrency_control#model-c-multi-writer)
docs. Let us see it in action here.
+
+In the below example, we have two streaming ingestion pipelines that
concurrently update the same table. One of the
+pipeline is responsible for the compaction and cleaning table services, while
the other pipeline is just for data
+ingestion.
+
+```sql
+-- set the interval as 30 seconds
+execution.checkpointing.interval: 30000
+state.backend: rocksdb
+
+-- This is a datagen source that can generates records continuously
+CREATE TABLE sourceT (
+ uuid varchar(20),
+ name varchar(10),
+ age int,
+ ts timestamp(3),
+ `partition` as 'par1'
+) WITH (
+ 'connector' = 'datagen',
+ 'rows-per-second' = '200'
+);
+
+-- pipeline1: by default enable the compaction and cleaning services
+CREATE TABLE t1(
+ uuid varchar(20),
+ name varchar(10),
+ age int,
+ ts timestamp(3),
+ `partition` varchar(20)
+) WITH (
+ 'connector' = 'hudi',
+ 'path' = '/Users/chenyuzhao/workspace/hudi-demo/t1',
+ 'table.type' = 'MERGE_ON_READ',
+ 'index.type' = 'BUCKET',
+ 'hoodie.write.concurrency.mode' = 'NON_BLOCKING_CONCURRENCY_CONTROL',
+ 'write.tasks' = '2'
+);
+
+-- pipeline2: disable the compaction and cleaning services manually
+CREATE TABLE t1_2(
+ uuid varchar(20),
+ name varchar(10),
+ age int,
+ ts timestamp(3),
+ `partition` varchar(20)
+) WITH (
+ 'connector' = 'hudi',
+ 'path' = '/Users/chenyuzhao/workspace/hudi-demo/t1',
+ 'table.type' = 'MERGE_ON_READ',
+ 'index.type' = 'BUCKET',
+ 'hoodie.write.concurrency.mode' = 'NON_BLOCKING_CONCURRENCY_CONTROL',
+ 'write.tasks' = '2',
+ 'compaction.schedule.enabled' = 'false',
+ 'compaction.async.enabled' = 'false',
+ 'clean.async.enabled' = 'false'
+);
+
+-- submit the pipelines
+insert into t1 select * from sourceT;
+insert into t1_2 select * from sourceT;
+
+select * from t1 limit 20;
+```
+
+As you can see from the above example, we have two pipelines with multiple
tasks that concurrently write to the
+same table. To use the new concurrency mode, all you need to do is set the
`hoodie.write.concurrency.mode`
+to `NON_BLOCKING_CONCURRENCY_CONTROL`. The `write.tasks` option is used to
specify the number of write tasks that will
+be used for writing to the table. The `compaction.schedule.enabled`,
`compaction.async.enabled`
+and `clean.async.enabled` options are used to disable the compaction and
cleaning services for the second pipeline.
+This is done to ensure that the compaction and cleaning services are not
executed twice for the same table.
diff --git a/website/static/assets/images/lsm_tree.png
b/website/static/assets/images/lsm_tree.png
new file mode 100644
index 00000000000..e277f52556f
Binary files /dev/null and b/website/static/assets/images/lsm_tree.png differ