This is an automated email from the ASF dual-hosted git repository.
nagarwal pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/asf-site by this push:
new e5caa41 [HUDI-1679] Concurrency Control in Hudi (#2698)
e5caa41 is described below
commit e5caa41de16cfe3213e638237e0dff46ddf1bf96
Author: n3nash <[email protected]>
AuthorDate: Mon Mar 22 01:20:15 2021 -0700
[HUDI-1679] Concurrency Control in Hudi (#2698)
---
docs/_data/navigation.yml | 2 +
docs/_docs/2_4_configurations.md | 63 ++++++++++++++
docs/_docs/2_9_concurrency_control.md | 151 ++++++++++++++++++++++++++++++++++
3 files changed, 216 insertions(+)
diff --git a/docs/_data/navigation.yml b/docs/_data/navigation.yml
index 5803a43..114bed3 100644
--- a/docs/_data/navigation.yml
+++ b/docs/_data/navigation.yml
@@ -28,6 +28,8 @@ docs:
url: /docs/use_cases.html
- title: "Writing Data"
url: /docs/writing_data.html
+ - title: "Concurrency Control"
+ url: /docs/concurrency_control.html
- title: "Querying Data"
url: /docs/querying_data.html
- title: "Configuration"
diff --git a/docs/_docs/2_4_configurations.md b/docs/_docs/2_4_configurations.md
index ec35e64..e176550 100644
--- a/docs/_docs/2_4_configurations.md
+++ b/docs/_docs/2_4_configurations.md
@@ -824,3 +824,66 @@ Property: `hoodie.write.commit.callback.kafka.acks` <br/>
##### CALLBACK_KAFKA_RETRIES
Property: `hoodie.write.commit.callback.kafka.retries` <br/>
<span style="color:grey">Times to retry. 3 by default</span>
+
+### Locking configs
+Configs that control locking mechanisms if
[WriteConcurrencyMode=optimistic_concurrency_control](#WriteConcurrencyMode) is
enabled
+[withLockConfig](#withLockConfig) (HoodieLockConfig) <br/>
+
+#### withLockProvider(lockProvider =
org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider)
{#withLockProvider}
+Property: `hoodie.writer.lock.provider` <br/>
+<span style="color:grey">Lock provider class name, user can provide their own
implementation of LockProvider which should be subclass of
org.apache.hudi.common.lock.LockProvider</span>
+
+#### withZkQuorum(zkQuorum) {#withZkQuorum}
+Property: `hoodie.writer.lock.zookeeper.url` <br/>
+<span style="color:grey">Set the list of comma separated servers to connect
to</span>
+
+#### withZkBasePath(zkBasePath) {#withZkBasePath}
+Property: `hoodie.writer.lock.zookeeper.base_path` [Required] <br/>
+<span style="color:grey">The base path on Zookeeper under which to create a
ZNode to acquire the lock. This should be common for all jobs writing to the
same table</span>
+
+#### withZkPort(zkPort) {#withZkPort}
+Property: `hoodie.writer.lock.zookeeper.port` [Required] <br/>
+<span style="color:grey">The connection port to be used for Zookeeper</span>
+
+#### withZkLockKey(zkLockKey) {#withZkLockKey}
+Property: `hoodie.writer.lock.zookeeper.lock_key` [Required] <br/>
+<span style="color:grey">Key name under base_path at which to create a ZNode
and acquire lock. Final path on zk will look like base_path/lock_key. We
recommend setting this to the table name</span>
+
+#### withZkConnectionTimeoutInMs(connectionTimeoutInMs = 15000)
{#withZkConnectionTimeoutInMs}
+Property: `hoodie.writer.lock.zookeeper.connection_timeout_ms` <br/>
+<span style="color:grey">How long to wait when connecting to ZooKeeper before
considering the connection a failure</span>
+
+#### withZkSessionTimeoutInMs(sessionTimeoutInMs = 60000)
{#withZkSessionTimeoutInMs}
+Property: `hoodie.writer.lock.zookeeper.session_timeout_ms` <br/>
+<span style="color:grey">How long to wait after losing a connection to
ZooKeeper before the session is expired</span>
+
+#### withNumRetries(num_retries = 3) {#withNumRetries}
+Property: `hoodie.writer.lock.num_retries` <br/>
+<span style="color:grey">Maximum number of times to retry by lock provider
client</span>
+
+#### withRetryWaitTimeInMillis(retryWaitTimeInMillis = 5000)
{#withRetryWaitTimeInMillis}
+Property: `hoodie.writer.lock.wait_time_ms_between_retry` <br/>
+<span style="color:grey">Initial amount of time to wait between retries by
lock provider client</span>
+
+#### withHiveDatabaseName(hiveDatabaseName) {#withHiveDatabaseName}
+Property: `hoodie.writer.lock.hivemetastore.database` [Required] <br/>
+<span style="color:grey">The Hive database to acquire lock against</span>
+
+#### withHiveTableName(hiveTableName) {#withHiveTableName}
+Property: `hoodie.writer.lock.hivemetastore.table` [Required] <br/>
+<span style="color:grey">The Hive table under the hive database to acquire
lock against</span>
+
+#### withClientNumRetries(clientNumRetries = 0) {#withClientNumRetries}
+Property: `hoodie.writer.lock.client.num_retries` <br/>
+<span style="color:grey">Maximum number of times to retry to acquire lock
additionally from the hudi client</span>
+
+#### withRetryWaitTimeInMillis(retryWaitTimeInMillis = 10000)
{#withRetryWaitTimeInMillis}
+Property: `hoodie.writer.lock.client.wait_time_ms_between_retry` <br/>
+<span style="color:grey">Amount of time to wait between retries from the hudi
client</span>
+
+#### withConflictResolutionStrategy(lockProvider =
org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy)
{#withConflictResolutionStrategy}
+Property: `hoodie.writer.lock.conflict.resolution.strategy` <br/>
+<span style="color:grey">Lock provider class name, this should be subclass of
org.apache.hudi.client.transaction.ConflictResolutionStrategy</span>
+
+
+
diff --git a/docs/_docs/2_9_concurrency_control.md
b/docs/_docs/2_9_concurrency_control.md
new file mode 100644
index 0000000..e555d39
--- /dev/null
+++ b/docs/_docs/2_9_concurrency_control.md
@@ -0,0 +1,151 @@
+---
+title: "Concurrent Writes to Hudi Tables"
+permalink: /docs/concurrency_control.html
+summary: In this page, we will discuss how to perform concurrent writes to
Hudi Tables.
+toc: true
+last_modified_at: 2021-03-19T15:59:57-04:00
+---
+
+In this section, we will cover Hudi's concurrency model and describe ways to
ingest data into a Hudi Table from multiple writers; using the
[DeltaStreamer](#deltastreamer) tool as well as
+using the [Hudi datasource](#datasource-writer).
+
+## Supported Concurrency Controls
+
+- **MVCC** : Hudi table services such as compaction, cleaning, clustering
leverage Multi Version Concurrency Control to provide snapshot isolation
+between multiple table service writers and readers. Additionally, using MVCC,
Hudi provides snapshot isolation between an ingestion writer and multiple
concurrent readers.
+ With this model, Hudi supports running any number of table service jobs
concurrently, without any concurrency conflict.
+ This is made possible by ensuring that scheduling plans of such table
services always happens in a single writer mode to ensure no conflict and
avoids race conditions.
+
+- **[NEW] OPTIMISTIC CONCURRENCY** : Write operations such as the ones
described above (UPSERT, INSERT) etc, leverage optimistic concurrency control
to enable multiple ingestion writers to
+the same Hudi Table. Hudi supports `file level OCC`, i.e., for any 2 commits
(or writers) happening to the same table, if they do not have writes to
overlapping files being changed, both writers are allowed to succeed.
+ This feature is currently *experimental* and requires either Zookeeper or
HiveMetastore to acquire locks.
+
+It may be helpful to understand the different guarantees provided by [write
operations](/docs/writing_data.html#write-operations) via Hudi datasource or
the delta streamer.
+
+## Single Writer Guarantees
+
+ - *UPSERT Guarantee*: The target table will NEVER show duplicates.
+ - *INSERT Guarantee*: The target table wilL NEVER have duplicates if
[dedup](/docs/configurations.html#INSERT_DROP_DUPS_OPT_KEY) is enabled.
+ - *BULK_INSERT Guarantee*: The target table will NEVER have duplicates if
[dedup](/docs/configurations.html#INSERT_DROP_DUPS_OPT_KEY) is enabled.
+ - *INCREMENTAL PULL Guarantee*: Data consumption and checkpoints are NEVER
out of order.
+
+## Multi Writer Guarantees
+
+With multiple writers using OCC, some of the above guarantees change as follows
+
+- *UPSERT Guarantee*: The target table will NEVER show duplicates.
+- *INSERT Guarantee*: The target table MIGHT have duplicates even if
[dedup](/docs/configurations.html#INSERT_DROP_DUPS_OPT_KEY) is enabled.
+- *BULK_INSERT Guarantee*: The target table MIGHT have duplicates even if
[dedup](/docs/configurations.html#INSERT_DROP_DUPS_OPT_KEY) is enabled.
+- *INCREMENTAL PULL Guarantee*: Data consumption and checkpoints MIGHT be out
of order due to multiple writer jobs finishing at different times.
+
+## Enabling Multi Writing
+
+The following properties are needed to be set properly to turn on optimistic
concurrency control.
+
+```
+hoodie.write.concurrency.mode=optimistic_concurrency_control
+hoodie.failed.writes.cleaner.policy=LAZY
+hoodie.writer.lock.provider=<lock-provider-classname>
+```
+
+There are 2 different server based lock providers that require different
configuration to be set.
+
+**`Zookeeper`** based lock provider
+
+```
+hoodie.writer.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
+hoodie.writer.lock.zookeeper.url
+hoodie.writer.lock.zookeeper.port
+hoodie.writer.lock.wait_time_ms
+hoodie.writer.lock.num_retries
+hoodie.writer.lock.lock_key
+hoodie.writer.lock.zookeeper.zk_base_path
+```
+
+**`HiveMetastore`** based lock provider
+
+```
+hoodie.writer.lock.provider=org.apache.hudi.hive.HiveMetastoreBasedLockProvider
+hoodie.writer.lock.hivemetastore.database
+hoodie.writer.lock.hivemetastore.table
+hoodie.writer.lock.wait_time_ms
+hoodie.writer.lock.num_retries
+```
+
+`The HiveMetastore URI's are picked up from the hadoop configuration file
loaded during runtime.`
+
+## Datasource Writer
+
+The `hudi-spark` module offers the DataSource API to write (and read) a Spark
DataFrame into a Hudi table.
+
+Following is an example of how to use optimistic_concurrency_control via spark
datasource
+
+```java
+inputDF.write.format("hudi")
+ .options(getQuickstartWriteConfigs)
+ .option(PRECOMBINE_FIELD_OPT_KEY, "ts")
+ .option("hoodie.failed.writes.cleaner.policy", "LAZY")
+ .option("hoodie.write.concurrency.mode",
"optimistic_concurrency_control")
+ .option("hoodie.writer.lock.zookeeper.url", "zookeeper")
+ .option("hoodie.writer.lock.zookeeper.port", "2181")
+ .option("hoodie.writer.lock.wait_time_ms", "12000")
+ .option("hoodie.writer.lock.num_retries", "2")
+ .option("hoodie.writer.lock.lock_key", "test_table")
+ .option("hoodie.writer.lock.zookeeper.zk_base_path", "/test")
+ .option(RECORDKEY_FIELD_OPT_KEY, "uuid")
+ .option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath")
+ .option(TABLE_NAME, tableName)
+ .mode(Overwrite)
+ .save(basePath)
+```
+
+## DeltaStreamer
+
+The `HoodieDeltaStreamer` utility (part of hudi-utilities-bundle) provides
ways to ingest from different sources such as DFS or Kafka, with the following
capabilities.
+
+Using optimistic_concurrency_control via delta streamer requires adding the
above configs to the properties file that can be passed to the
+job. For example below, adding the configs to kafka-source.properties file and
passing them to deltastreamer will enable optimistic concurrency.
+A deltastreamer job can then be triggered as follows:
+
+```java
+[hoodie]$ spark-submit --class
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls
packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar` \
+ --props
file://${PWD}/hudi-utilities/src/test/resources/delta-streamer-config/kafka-source.properties
\
+ --schemaprovider-class
org.apache.hudi.utilities.schema.SchemaRegistryProvider \
+ --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
+ --source-ordering-field impresssiontime \
+ --target-base-path file:\/\/\/tmp/hudi-deltastreamer-op \
+ --target-table uber.impressions \
+ --op BULK_INSERT
+```
+
+## Best Practices when using Optimistic Concurrency Control
+
+Concurrent Writing to Hudi tables requires acquiring a lock with either
Zookeeper or HiveMetastore. Due to several reasons you might want to configure
retries to allow your application to acquire the lock.
+1. Network connectivity or excessive load on servers increasing time for lock
acquisition resulting in timeouts
+2. Running a large number of concurrent jobs that are writing to the same hudi
table can result in contention during lock acquisition can cause timeouts
+3. In some scenarios of conflict resolution, Hudi commit operations might take
upto 10's of seconds while the lock is being held. This can result in timeouts
for other jobs waiting to acquire a lock.
+
+Set the correct native lock provider client retries. NOTE that sometimes these
settings are set on the server once and all clients inherit the same configs.
Please check your settings before enabling optimistic concurrency.
+
+```
+hoodie.writer.lock.wait_time_ms
+hoodie.writer.lock.num_retries
+```
+
+Set the correct hudi client retries for Zookeeper & HiveMetastore. This is
useful in cases when native client retry settings cannot be changed. Please
note that these retries will happen in addition to any native client retries
that you may have set.
+
+```
+hoodie.writer.lock.client.wait_time_ms
+hoodie.writer.lock.client.num_retries
+```
+
+*Setting the right values for these depends on a case by case basis; some
defaults have been provided for general cases.*
+
+## Disabling Multi Writing
+
+Remove the following settings that were used to enable multi-writer or
override with default values.
+
+```
+hoodie.write.concurrency.mode=single_writer
+hoodie.failed.writes.cleaner.policy=EAGER
+```
\ No newline at end of file