This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a8312a9b8c3 [HUDI-5887] Distinguish the single writer enabling
metadata table and multi-writer use cases for lock guard (#8111)
a8312a9b8c3 is described below
commit a8312a9b8c39f4baabf753974fa092c4767abb72
Author: Danny Chan <[email protected]>
AuthorDate: Wed Mar 8 14:46:01 2023 +0800
[HUDI-5887] Distinguish the single writer enabling metadata table and
multi-writer use cases for lock guard (#8111)
* distinguish the single writer enabling metadata table and multi-writer
use cases for lock guard,
that give us the chance for single table optimization such as some
restrictions of metadata table compaction/initialization can be loosen or
eliminated
* fix the isLockProviderPropertySet flag within HoodieWriteConfig builder
---
.../org/apache/hudi/client/BaseHoodieClient.java | 3 ++-
.../apache/hudi/client/BaseHoodieWriteClient.java | 22 ++++++----------
.../DirectMarkerTransactionManager.java | 7 +++---
.../client/transaction/TransactionManager.java | 19 +++++++-------
.../org/apache/hudi/config/HoodieWriteConfig.java | 15 ++++++-----
.../action/commit/BaseCommitActionExecutor.java | 2 +-
.../apache/hudi/config/TestHoodieWriteConfig.java | 29 +++++++++++-----------
.../apache/hudi/client/HoodieFlinkWriteClient.java | 2 +-
8 files changed, 47 insertions(+), 52 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
index a5faaffe3d7..b8409c7d19b 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
@@ -170,7 +170,8 @@ public abstract class BaseHoodieClient implements
Serializable, AutoCloseable {
*
* @param table A hoodie table instance created after transaction starts so
that the latest commits and files are captured.
* @param metadata Current committing instant's metadata
- * @param pendingInflightAndRequestedInstants
+ * @param pendingInflightAndRequestedInstants Pending instants on the
timeline
+ *
* @see {@link BaseHoodieWriteClient#preCommit}
* @see {@link BaseHoodieTableServiceClient#preCommit}
*/
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index a248e6bd9b4..594d32de939 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -493,7 +493,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
public void preWrite(String instantTime, WriteOperationType
writeOperationType,
HoodieTableMetaClient metaClient) {
setOperationType(writeOperationType);
- this.lastCompletedTxnAndMetadata =
txnManager.isOptimisticConcurrencyControlEnabled()
+ this.lastCompletedTxnAndMetadata = txnManager.isNeedsLockGuard()
? TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient) :
Option.empty();
this.pendingInflightAndRequestedInstants =
TransactionUtils.getInflightAndRequestedInstants(metaClient);
this.pendingInflightAndRequestedInstants.remove(instantTime);
@@ -1086,7 +1086,8 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
*
* @param extraMetadata Metadata to pass onto the scheduled service instant
* @param tableServiceType Type of table service to schedule
- * @return
+ *
+ * @return The given instant time option or empty if no table service plan
is scheduled
*/
public Option<String> scheduleTableService(Option<Map<String, String>>
extraMetadata, TableServiceType tableServiceType) {
String instantTime = HoodieActiveTimeline.createNewInstantTime();
@@ -1098,20 +1099,11 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
*
* @param extraMetadata Metadata to pass onto the scheduled service instant
* @param tableServiceType Type of table service to schedule
- * @return
+ *
+ * @return The given instant time option or empty if no table service plan
is scheduled
*/
- public Option<String> scheduleTableService(String instantTime,
Option<Map<String, String>> extraMetadata,
- TableServiceType
tableServiceType) {
- // A lock is required to guard against race conditions between an on-going
writer and scheduling a table service.
- final Option<HoodieInstant> inflightInstant = Option.of(new
HoodieInstant(HoodieInstant.State.REQUESTED,
- tableServiceType.getAction(), instantTime));
- try {
- this.txnManager.beginTransaction(inflightInstant, Option.empty());
- LOG.info("Scheduling table service " + tableServiceType);
- return tableServiceClient.scheduleTableServiceInternal(instantTime,
extraMetadata, tableServiceType);
- } finally {
- this.txnManager.endTransaction(inflightInstant);
- }
+ public Option<String> scheduleTableService(String instantTime,
Option<Map<String, String>> extraMetadata, TableServiceType tableServiceType) {
+ return tableServiceClient.scheduleTableService(instantTime, extraMetadata,
tableServiceType);
}
public HoodieMetrics getMetrics() {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java
index 84fac2db004..f27af9a2549 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java
@@ -42,13 +42,12 @@ public class DirectMarkerTransactionManager extends
TransactionManager {
private final String filePath;
public DirectMarkerTransactionManager(HoodieWriteConfig config, FileSystem
fs, String partitionPath, String fileId) {
- super(new LockManager(config, fs, createUpdatedLockProps(config,
partitionPath, fileId)),
-
config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl());
+ super(new LockManager(config, fs, createUpdatedLockProps(config,
partitionPath, fileId)), config.needsLockGuard());
this.filePath = partitionPath + "/" + fileId;
}
public void beginTransaction(String newTxnOwnerInstantTime) {
- if (isOptimisticConcurrencyControlEnabled) {
+ if (needsLockGuard) {
LOG.info("Transaction starting for " + newTxnOwnerInstantTime + " and "
+ filePath);
lockManager.lock();
@@ -58,7 +57,7 @@ public class DirectMarkerTransactionManager extends
TransactionManager {
}
public void endTransaction(String currentTxnOwnerInstantTime) {
- if (isOptimisticConcurrencyControlEnabled) {
+ if (needsLockGuard) {
LOG.info("Transaction ending with transaction owner " +
currentTxnOwnerInstantTime
+ " for " + filePath);
if (reset(Option.of(getInstant(currentTxnOwnerInstantTime)),
Option.empty(), Option.empty())) {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java
index 7fddf8a944b..40744a2f5d2 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java
@@ -37,23 +37,22 @@ public class TransactionManager implements Serializable {
protected static final Logger LOG =
LogManager.getLogger(TransactionManager.class);
protected final LockManager lockManager;
- protected final boolean isOptimisticConcurrencyControlEnabled;
+ protected final boolean needsLockGuard;
protected Option<HoodieInstant> currentTxnOwnerInstant = Option.empty();
private Option<HoodieInstant> lastCompletedTxnOwnerInstant = Option.empty();
public TransactionManager(HoodieWriteConfig config, FileSystem fs) {
- this(new LockManager(config, fs),
-
config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl());
+ this(new LockManager(config, fs), config.needsLockGuard());
}
- protected TransactionManager(LockManager lockManager, boolean
isOptimisticConcurrencyControlEnabled) {
+ protected TransactionManager(LockManager lockManager, boolean
needsLockGuard) {
this.lockManager = lockManager;
- this.isOptimisticConcurrencyControlEnabled =
isOptimisticConcurrencyControlEnabled;
+ this.needsLockGuard = needsLockGuard;
}
public void beginTransaction(Option<HoodieInstant> newTxnOwnerInstant,
Option<HoodieInstant>
lastCompletedTxnOwnerInstant) {
- if (isOptimisticConcurrencyControlEnabled) {
+ if (needsLockGuard) {
LOG.info("Transaction starting for " + newTxnOwnerInstant
+ " with latest completed transaction instant " +
lastCompletedTxnOwnerInstant);
lockManager.lock();
@@ -64,7 +63,7 @@ public class TransactionManager implements Serializable {
}
public void endTransaction(Option<HoodieInstant> currentTxnOwnerInstant) {
- if (isOptimisticConcurrencyControlEnabled) {
+ if (needsLockGuard) {
LOG.info("Transaction ending with transaction owner " +
currentTxnOwnerInstant);
if (reset(currentTxnOwnerInstant, Option.empty(), Option.empty())) {
lockManager.unlock();
@@ -85,7 +84,7 @@ public class TransactionManager implements Serializable {
}
public void close() {
- if (isOptimisticConcurrencyControlEnabled) {
+ if (needsLockGuard) {
lockManager.close();
LOG.info("Transaction manager closed");
}
@@ -103,7 +102,7 @@ public class TransactionManager implements Serializable {
return currentTxnOwnerInstant;
}
- public boolean isOptimisticConcurrencyControlEnabled() {
- return isOptimisticConcurrencyControlEnabled;
+ public boolean isNeedsLockGuard() {
+ return needsLockGuard;
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 886112cae16..f06f0a9cfc5 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2405,6 +2405,13 @@ public class HoodieWriteConfig extends HoodieConfig {
return getBooleanOrDefault(RELEASE_RESOURCE_ENABLE);
}
+ /**
+ * Returns whether the explicit guard of lock is required.
+ */
+ public boolean needsLockGuard() {
+ return isMetadataTableEnabled() ||
getWriteConcurrencyMode().supportsOptimisticConcurrencyControl();
+ }
+
/**
* Layout configs.
*/
@@ -2946,8 +2953,7 @@ public class HoodieWriteConfig extends HoodieConfig {
// isLockProviderPropertySet must be fetched before setting defaults of
HoodieLockConfig
final TypedProperties writeConfigProperties = writeConfig.getProps();
- final boolean isLockProviderPropertySet =
writeConfigProperties.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME)
- ||
writeConfigProperties.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_PROP);
+ final boolean isLockProviderPropertySet =
writeConfigProperties.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key());
writeConfig.setDefaultOnCondition(!isLockConfigSet,
HoodieLockConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
@@ -2971,13 +2977,10 @@ public class HoodieWriteConfig extends HoodieConfig {
// This is targeted at Single writer with async table services
// If user does not set the lock provider, likely that the
concurrency mode is not set either
// Override the configs for metadata table
- writeConfig.setValue(WRITE_CONCURRENCY_MODE.key(),
- WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value());
writeConfig.setValue(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
InProcessLockProvider.class.getName());
- LOG.info(String.format("Automatically set %s=%s and %s=%s since
user has not set the "
+ LOG.info(String.format("Automatically set %s=%s since user has not
set the "
+ "lock provider for single writer with async table
services",
- WRITE_CONCURRENCY_MODE.key(),
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value(),
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
InProcessLockProvider.class.getName()));
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
index 8f3a0244d2e..0ab7efec82d 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
@@ -92,7 +92,7 @@ public abstract class BaseCommitActionExecutor<T, I, K, O, R>
this.taskContextSupplier = context.getTaskContextSupplier();
// TODO : Remove this once we refactor and move out autoCommit method from
here, since the TxnManager is held in {@link BaseHoodieWriteClient}.
this.txnManagerOption = config.shouldAutoCommit() ? Option.of(new
TransactionManager(config, table.getMetaClient().getFs())) : Option.empty();
- if (this.txnManagerOption.isPresent() &&
this.txnManagerOption.get().isOptimisticConcurrencyControlEnabled()) {
+ if (this.txnManagerOption.isPresent() &&
this.txnManagerOption.get().isNeedsLockGuard()) {
// these txn metadata are only needed for auto commit when optimistic
concurrent control is also enabled
this.lastCompletedTxn =
TransactionUtils.getLastCompletedTxnInstantAndMetadata(table.getMetaClient());
this.pendingInflightAndRequestedInstants =
TransactionUtils.getInflightAndRequestedInstants(table.getMetaClient());
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
index e7afa50a59d..1f4d6b18587 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
@@ -140,8 +140,10 @@ public class TestHoodieWriteConfig {
put(ASYNC_CLEAN.key(), "false");
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
}
- }), true, true, true,
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
- HoodieFailedWritesCleaningPolicy.LAZY, inProcessLockProviderClassName);
+ }), true, true, true,
+ WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
+
HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
+ inProcessLockProviderClassName);
// 2. Async clean
verifyConcurrencyControlRelatedConfigs(createWriteConfig(new
HashMap<String, String>() {
@@ -153,8 +155,10 @@ public class TestHoodieWriteConfig {
put(ASYNC_CLEAN.key(), "true");
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
}
- }), true, true, true,
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
- HoodieFailedWritesCleaningPolicy.LAZY, inProcessLockProviderClassName);
+ }), true, true, true,
+ WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
+
HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
+ inProcessLockProviderClassName);
// 3. Async compaction configured
verifyConcurrencyControlRelatedConfigs(createWriteConfig(new
HashMap<String, String>() {
@@ -168,12 +172,8 @@ public class TestHoodieWriteConfig {
}
}), true,
tableType == HoodieTableType.MERGE_ON_READ, true,
- tableType == HoodieTableType.MERGE_ON_READ
- ? WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL
- :
WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
- tableType == HoodieTableType.MERGE_ON_READ
- ? HoodieFailedWritesCleaningPolicy.LAZY
- :
HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
+ WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
+
HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
tableType == HoodieTableType.MERGE_ON_READ
? inProcessLockProviderClassName
: HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue());
@@ -205,8 +205,9 @@ public class TestHoodieWriteConfig {
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
}
}), true, true, false,
- WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
- HoodieFailedWritesCleaningPolicy.LAZY, inProcessLockProviderClassName);
+ WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
+
HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
+ inProcessLockProviderClassName);
}
@ParameterizedTest
@@ -289,8 +290,8 @@ public class TestHoodieWriteConfig {
if (writeConfig.areAnyTableServicesAsync()) {
verifyConcurrencyControlRelatedConfigs(writeConfig,
true, true, true,
- WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
- HoodieFailedWritesCleaningPolicy.LAZY,
+ WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
+
HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
InProcessLockProvider.class.getName());
} else {
verifyConcurrencyControlRelatedConfigs(writeConfig,
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 3d4f6a3873b..1d4697d709d 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -280,7 +280,7 @@ public class HoodieFlinkWriteClient<T> extends
* should be called before the Driver starts a new transaction.
*/
public void preTxn(HoodieTableMetaClient metaClient) {
- if (txnManager.isOptimisticConcurrencyControlEnabled()) {
+ if (txnManager.isNeedsLockGuard()) {
// refresh the meta client which is reused
metaClient.reloadActiveTimeline();
this.lastCompletedTxnAndMetadata =
TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient);