This is an automated email from the ASF dual-hosted git repository.

nagarwal pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/asf-site by this push:
     new e5caa41  [HUDI-1679] Concurrency Control in Hudi (#2698)
e5caa41 is described below

commit e5caa41de16cfe3213e638237e0dff46ddf1bf96
Author: n3nash <[email protected]>
AuthorDate: Mon Mar 22 01:20:15 2021 -0700

    [HUDI-1679] Concurrency Control in Hudi (#2698)
---
 docs/_data/navigation.yml             |   2 +
 docs/_docs/2_4_configurations.md      |  63 ++++++++++++++
 docs/_docs/2_9_concurrency_control.md | 151 ++++++++++++++++++++++++++++++++++
 3 files changed, 216 insertions(+)

diff --git a/docs/_data/navigation.yml b/docs/_data/navigation.yml
index 5803a43..114bed3 100644
--- a/docs/_data/navigation.yml
+++ b/docs/_data/navigation.yml
@@ -28,6 +28,8 @@ docs:
         url: /docs/use_cases.html
       - title: "Writing Data"
         url: /docs/writing_data.html
+      - title: "Concurrency Control"
+        url: /docs/concurrency_control.html
       - title: "Querying Data"
         url: /docs/querying_data.html
       - title: "Configuration"
diff --git a/docs/_docs/2_4_configurations.md b/docs/_docs/2_4_configurations.md
index ec35e64..e176550 100644
--- a/docs/_docs/2_4_configurations.md
+++ b/docs/_docs/2_4_configurations.md
@@ -824,3 +824,66 @@ Property: `hoodie.write.commit.callback.kafka.acks` <br/>
 ##### CALLBACK_KAFKA_RETRIES
 Property: `hoodie.write.commit.callback.kafka.retries` <br/>
 <span style="color:grey">Times to retry. 3 by default</span>
+
+### Locking configs
+Configs that control locking mechanisms if 
[WriteConcurrencyMode=optimistic_concurrency_control](#WriteConcurrencyMode) is 
enabled
+[withLockConfig](#withLockConfig) (HoodieLockConfig) <br/>
+
+#### withLockProvider(lockProvider = 
org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider) 
{#withLockProvider}
+Property: `hoodie.writer.lock.provider` <br/>
+<span style="color:grey">Lock provider class name, user can provide their own 
implementation of LockProvider which should be subclass of 
org.apache.hudi.common.lock.LockProvider</span>
+
+#### withZkQuorum(zkQuorum) {#withZkQuorum}
+Property: `hoodie.writer.lock.zookeeper.url` <br/>
+<span style="color:grey">Set the list of comma separated servers to connect 
to</span>
+
+#### withZkBasePath(zkBasePath) {#withZkBasePath}
+Property: `hoodie.writer.lock.zookeeper.base_path` [Required] <br/>
+<span style="color:grey">The base path on Zookeeper under which to create a 
ZNode to acquire the lock. This should be common for all jobs writing to the 
same table</span>
+
+#### withZkPort(zkPort) {#withZkPort}
+Property: `hoodie.writer.lock.zookeeper.port` [Required] <br/>
+<span style="color:grey">The connection port to be used for Zookeeper</span>
+
+#### withZkLockKey(zkLockKey) {#withZkLockKey}
+Property: `hoodie.writer.lock.zookeeper.lock_key` [Required] <br/>
+<span style="color:grey">Key name under base_path at which to create a ZNode 
and acquire lock. Final path on zk will look like base_path/lock_key. We 
recommend setting this to the table name</span>
+
+#### withZkConnectionTimeoutInMs(connectionTimeoutInMs = 15000) 
{#withZkConnectionTimeoutInMs}
+Property: `hoodie.writer.lock.zookeeper.connection_timeout_ms` <br/>
+<span style="color:grey">How long to wait when connecting to ZooKeeper before 
considering the connection a failure</span>
+
+#### withZkSessionTimeoutInMs(sessionTimeoutInMs = 60000) 
{#withZkSessionTimeoutInMs}
+Property: `hoodie.writer.lock.zookeeper.session_timeout_ms` <br/>
+<span style="color:grey">How long to wait after losing a connection to 
ZooKeeper before the session is expired</span>
+
+#### withNumRetries(num_retries = 3) {#withNumRetries}
+Property: `hoodie.writer.lock.num_retries` <br/>
+<span style="color:grey">Maximum number of times to retry by lock provider 
client</span>
+
+#### withRetryWaitTimeInMillis(retryWaitTimeInMillis = 5000) 
{#withRetryWaitTimeInMillis}
+Property: `hoodie.writer.lock.wait_time_ms_between_retry` <br/>
+<span style="color:grey">Initial amount of time to wait between retries by 
lock provider client</span>
+
+#### withHiveDatabaseName(hiveDatabaseName) {#withHiveDatabaseName}
+Property: `hoodie.writer.lock.hivemetastore.database` [Required] <br/>
+<span style="color:grey">The Hive database to acquire lock against</span>
+
+#### withHiveTableName(hiveTableName) {#withHiveTableName}
+Property: `hoodie.writer.lock.hivemetastore.table` [Required] <br/>
+<span style="color:grey">The Hive table under the hive database to acquire 
lock against</span>
+
+#### withClientNumRetries(clientNumRetries = 0) {#withClientNumRetries}
+Property: `hoodie.writer.lock.client.num_retries` <br/>
+<span style="color:grey">Maximum number of times to retry to acquire lock 
additionally from the hudi client</span>
+
+#### withRetryWaitTimeInMillis(retryWaitTimeInMillis = 10000) 
{#withRetryWaitTimeInMillis}
+Property: `hoodie.writer.lock.client.wait_time_ms_between_retry` <br/>
+<span style="color:grey">Amount of time to wait between retries from the hudi 
client</span>
+
+#### withConflictResolutionStrategy(lockProvider = 
org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy)
 {#withConflictResolutionStrategy}
+Property: `hoodie.writer.lock.conflict.resolution.strategy` <br/>
+<span style="color:grey">Lock provider class name, this should be subclass of 
org.apache.hudi.client.transaction.ConflictResolutionStrategy</span>
+
+
+
diff --git a/docs/_docs/2_9_concurrency_control.md 
b/docs/_docs/2_9_concurrency_control.md
new file mode 100644
index 0000000..e555d39
--- /dev/null
+++ b/docs/_docs/2_9_concurrency_control.md
@@ -0,0 +1,151 @@
+---
+title: "Concurrent Writes to Hudi Tables"
+permalink: /docs/concurrency_control.html
+summary: In this page, we will discuss how to perform concurrent writes to 
Hudi Tables.
+toc: true
+last_modified_at: 2021-03-19T15:59:57-04:00
+---
+
+In this section, we will cover Hudi's concurrency model and describe ways to 
ingest data into a Hudi Table from multiple writers; using the 
[DeltaStreamer](#deltastreamer) tool as well as 
+using the [Hudi datasource](#datasource-writer).
+
+## Supported Concurrency Controls
+
+- **MVCC** : Hudi table services such as compaction, cleaning, clustering 
leverage Multi Version Concurrency Control to provide snapshot isolation
+between multiple table service writers and readers. Additionally, using MVCC, 
Hudi provides snapshot isolation between an ingestion writer and multiple 
concurrent readers. 
+  With this model, Hudi supports running any number of table service jobs 
concurrently, without any concurrency conflict. 
+  This is made possible by ensuring that scheduling plans of such table 
services always happens in a single writer mode to ensure no conflict and 
avoids race conditions.
+
+- **[NEW] OPTIMISTIC CONCURRENCY** : Write operations such as the ones 
described above (UPSERT, INSERT) etc, leverage optimistic concurrency control 
to enable multiple ingestion writers to
+the same Hudi Table. Hudi supports `file level OCC`, i.e., for any 2 commits 
(or writers) happening to the same table, if they do not have writes to 
overlapping files being changed, both writers are allowed to succeed. 
+  This feature is currently *experimental* and requires either Zookeeper or 
HiveMetastore to acquire locks.
+
+It may be helpful to understand the different guarantees provided by [write 
operations](/docs/writing_data.html#write-operations) via Hudi datasource or 
the delta streamer.
+
+## Single Writer Guarantees
+
+ - *UPSERT Guarantee*: The target table will NEVER show duplicates.
+ - *INSERT Guarantee*: The target table wilL NEVER have duplicates if 
[dedup](/docs/configurations.html#INSERT_DROP_DUPS_OPT_KEY) is enabled.
+ - *BULK_INSERT Guarantee*: The target table will NEVER have duplicates if 
[dedup](/docs/configurations.html#INSERT_DROP_DUPS_OPT_KEY) is enabled.
+ - *INCREMENTAL PULL Guarantee*: Data consumption and checkpoints are NEVER 
out of order.
+
+## Multi Writer Guarantees
+
+With multiple writers using OCC, some of the above guarantees change as follows
+
+- *UPSERT Guarantee*: The target table will NEVER show duplicates.
+- *INSERT Guarantee*: The target table MIGHT have duplicates even if 
[dedup](/docs/configurations.html#INSERT_DROP_DUPS_OPT_KEY) is enabled.
+- *BULK_INSERT Guarantee*: The target table MIGHT have duplicates even if 
[dedup](/docs/configurations.html#INSERT_DROP_DUPS_OPT_KEY) is enabled.
+- *INCREMENTAL PULL Guarantee*: Data consumption and checkpoints MIGHT be out 
of order due to multiple writer jobs finishing at different times.
+
+## Enabling Multi Writing
+
+The following properties are needed to be set properly to turn on optimistic 
concurrency control.
+
+```
+hoodie.write.concurrency.mode=optimistic_concurrency_control
+hoodie.failed.writes.cleaner.policy=LAZY
+hoodie.writer.lock.provider=<lock-provider-classname>
+```
+
+There are 2 different server based lock providers that require different 
configuration to be set.
+
+**`Zookeeper`** based lock provider
+
+```
+hoodie.writer.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
+hoodie.writer.lock.zookeeper.url
+hoodie.writer.lock.zookeeper.port
+hoodie.writer.lock.wait_time_ms
+hoodie.writer.lock.num_retries
+hoodie.writer.lock.lock_key
+hoodie.writer.lock.zookeeper.zk_base_path
+```
+
+**`HiveMetastore`** based lock provider
+
+```
+hoodie.writer.lock.provider=org.apache.hudi.hive.HiveMetastoreBasedLockProvider
+hoodie.writer.lock.hivemetastore.database
+hoodie.writer.lock.hivemetastore.table
+hoodie.writer.lock.wait_time_ms
+hoodie.writer.lock.num_retries
+```
+
+`The HiveMetastore URI's are picked up from the hadoop configuration file 
loaded during runtime.`
+
+## Datasource Writer
+
+The `hudi-spark` module offers the DataSource API to write (and read) a Spark 
DataFrame into a Hudi table.
+
+Following is an example of how to use optimistic_concurrency_control via spark 
datasource
+
+```java
+inputDF.write.format("hudi")
+       .options(getQuickstartWriteConfigs)
+       .option(PRECOMBINE_FIELD_OPT_KEY, "ts")
+       .option("hoodie.failed.writes.cleaner.policy", "LAZY")
+       .option("hoodie.write.concurrency.mode", 
"optimistic_concurrency_control")
+       .option("hoodie.writer.lock.zookeeper.url", "zookeeper")
+       .option("hoodie.writer.lock.zookeeper.port", "2181")
+       .option("hoodie.writer.lock.wait_time_ms", "12000")
+       .option("hoodie.writer.lock.num_retries", "2")
+       .option("hoodie.writer.lock.lock_key", "test_table")
+       .option("hoodie.writer.lock.zookeeper.zk_base_path", "/test")
+       .option(RECORDKEY_FIELD_OPT_KEY, "uuid")
+       .option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath")
+       .option(TABLE_NAME, tableName)
+       .mode(Overwrite)
+       .save(basePath)
+```
+
+## DeltaStreamer
+
+The `HoodieDeltaStreamer` utility (part of hudi-utilities-bundle) provides 
ways to ingest from different sources such as DFS or Kafka, with the following 
capabilities.
+
+Using optimistic_concurrency_control via delta streamer requires adding the 
above configs to the properties file that can be passed to the
+job. For example below, adding the configs to kafka-source.properties file and 
passing them to deltastreamer will enable optimistic concurrency.
+A deltastreamer job can then be triggered as follows:
+
+```java
+[hoodie]$ spark-submit --class 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls 
packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar` \
+  --props 
file://${PWD}/hudi-utilities/src/test/resources/delta-streamer-config/kafka-source.properties
 \
+  --schemaprovider-class 
org.apache.hudi.utilities.schema.SchemaRegistryProvider \
+  --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
+  --source-ordering-field impresssiontime \
+  --target-base-path file:\/\/\/tmp/hudi-deltastreamer-op \ 
+  --target-table uber.impressions \
+  --op BULK_INSERT
+```
+
+## Best Practices when using Optimistic Concurrency Control
+
+Concurrent Writing to Hudi tables requires acquiring a lock with either 
Zookeeper or HiveMetastore. Due to several reasons you might want to configure 
retries to allow your application to acquire the lock. 
+1. Network connectivity or excessive load on servers increasing time for lock 
acquisition resulting in timeouts
+2. Running a large number of concurrent jobs that are writing to the same hudi 
table can result in contention during lock acquisition can cause timeouts
+3. In some scenarios of conflict resolution, Hudi commit operations might take 
upto 10's of seconds while the lock is being held. This can result in timeouts 
for other jobs waiting to acquire a lock.
+
+Set the correct native lock provider client retries. NOTE that sometimes these 
settings are set on the server once and all clients inherit the same configs. 
Please check your settings before enabling optimistic concurrency.
+   
+```
+hoodie.writer.lock.wait_time_ms
+hoodie.writer.lock.num_retries
+```
+
+Set the correct hudi client retries for Zookeeper & HiveMetastore. This is 
useful in cases when native client retry settings cannot be changed. Please 
note that these retries will happen in addition to any native client retries 
that you may have set. 
+
+```
+hoodie.writer.lock.client.wait_time_ms
+hoodie.writer.lock.client.num_retries
+```
+
+*Setting the right values for these depends on a case by case basis; some 
defaults have been provided for general cases.*
+
+## Disabling Multi Writing
+
+Remove the following settings that were used to enable multi-writer or 
override with default values.
+
+```
+hoodie.write.concurrency.mode=single_writer
+hoodie.failed.writes.cleaner.policy=EAGER
+```
\ No newline at end of file

Reply via email to