This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch release-0.12.2-blockers-candidate
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to
refs/heads/release-0.12.2-blockers-candidate by this push:
new 211af1a4fd [HUDI-5414] No need to guard the table initialization by
lock for HoodieFlinkWriteClient (#7509) (#7522)
211af1a4fd is described below
commit 211af1a4fd76ce84ce80f4d1b2befe5fc9954888
Author: Danny Chan <[email protected]>
AuthorDate: Wed Dec 21 12:10:34 2022 +0800
[HUDI-5414] No need to guard the table initialization by lock for
HoodieFlinkWriteClient (#7509) (#7522)
Different with other write clients, HoodieFlinkWriteClient invokes the
dataset writing methods(#upsert or #insert)
for each batch of new data set in the long running task. In current impl, a
engine-specific hoodie table would be created before performing
these actions, and before the table creation, some table bootstrapping
operations are performed(such as table upgrade/downgrade, the metadata table
bootstrap). These bootstrapping operations are guarded by a trasanction
lock.
In Flink, these bootstrapping operations can be avoided because they are
all performed only once on the coordinator.
The changes:
- Make BaseHoodieWriteClient#doInitTable non abstract, it now only performs
the bootstrapping operations
- Add a default impl BaseHoodieWriteClient#initMetadataTable for metadata
table bootstrap specifically
- Add a new abstract method for creating engine-specific hoodie table
(cherry picked from commit fd62a1413e74de686935672aec812aacd5c43a63)
---
.../apache/hudi/client/BaseHoodieWriteClient.java | 49 ++++++++++++++--------
.../apache/hudi/client/HoodieFlinkWriteClient.java | 24 +++++++----
.../apache/hudi/client/HoodieJavaWriteClient.java | 14 +++----
.../org/apache/hudi/table/HoodieJavaTable.java | 2 +-
.../apache/hudi/client/SparkRDDWriteClient.java | 20 ++++-----
.../org/apache/hudi/table/HoodieSparkTable.java | 5 +--
6 files changed, 64 insertions(+), 50 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index b4958f5692..258894fd79 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -317,6 +317,8 @@ public abstract class BaseHoodieWriteClient<T extends
HoodieRecordPayload, I, K,
protected abstract HoodieTable<T, I, K, O> createTable(HoodieWriteConfig
config, Configuration hadoopConf);
+ protected abstract HoodieTable<T, I, K, O> createTable(HoodieWriteConfig
config, Configuration hadoopConf, HoodieTableMetaClient metaClient);
+
void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata,
String actionType) {
if (writeTimer != null) {
long durationInMs = metrics.getDurationInMs(writeTimer.stop());
@@ -1425,17 +1427,38 @@ public abstract class BaseHoodieWriteClient<T extends
HoodieRecordPayload, I, K,
}
/**
- * Instantiates engine-specific instance of {@link HoodieTable} as well as
performs necessary
- * bootstrapping operations (for ex, validating whether Metadata Table has
to be bootstrapped)
+ * Performs necessary bootstrapping operations (for ex, validating whether
Metadata Table has to be bootstrapped).
*
- * NOTE: THIS OPERATION IS EXECUTED UNDER LOCK, THEREFORE SHOULD AVOID ANY
OPERATIONS
- * NOT REQUIRING EXTERNAL SYNCHRONIZATION
+ * <p>NOTE: THIS OPERATION IS EXECUTED UNDER LOCK, THEREFORE SHOULD AVOID
ANY OPERATIONS
+ * NOT REQUIRING EXTERNAL SYNCHRONIZATION
*
* @param metaClient instance of {@link HoodieTableMetaClient}
* @param instantTime current inflight instant time
- * @return instantiated {@link HoodieTable}
*/
- protected abstract HoodieTable doInitTable(HoodieTableMetaClient metaClient,
Option<String> instantTime, boolean initialMetadataTableIfNecessary);
+ protected void doInitTable(HoodieTableMetaClient metaClient, Option<String>
instantTime, boolean initialMetadataTableIfNecessary) {
+ Option<HoodieInstant> ownerInstant = Option.empty();
+ if (instantTime.isPresent()) {
+ ownerInstant = Option.of(new HoodieInstant(true,
CommitUtils.getCommitActionType(operationType, metaClient.getTableType()),
instantTime.get()));
+ }
+ this.txnManager.beginTransaction(ownerInstant, Option.empty());
+ try {
+ tryUpgrade(metaClient, instantTime);
+ if (initialMetadataTableIfNecessary) {
+ initMetadataTable(instantTime);
+ }
+ } finally {
+ this.txnManager.endTransaction(ownerInstant);
+ }
+ }
+
+ /**
+ * Bootstrap the metadata table.
+ *
+ * @param instantTime current inflight instant time
+ */
+ protected void initMetadataTable(Option<String> instantTime) {
+ // by default do nothing.
+ }
/**
* Instantiates and initializes instance of {@link HoodieTable}, performing
crucial bootstrapping
@@ -1457,18 +1480,8 @@ public abstract class BaseHoodieWriteClient<T extends
HoodieRecordPayload, I, K,
setWriteSchemaForDeletes(metaClient);
}
- HoodieTable table;
- Option<HoodieInstant> ownerInstant = Option.empty();
- if (instantTime.isPresent()) {
- ownerInstant = Option.of(new HoodieInstant(true,
CommitUtils.getCommitActionType(operationType, metaClient.getTableType()),
instantTime.get()));
- }
- this.txnManager.beginTransaction(ownerInstant, Option.empty());
- try {
- tryUpgrade(metaClient, instantTime);
- table = doInitTable(metaClient, instantTime,
initialMetadataTableIfNecessary);
- } finally {
- this.txnManager.endTransaction(ownerInstant);
- }
+ doInitTable(metaClient, instantTime, initialMetadataTableIfNecessary);
+ HoodieTable table = createTable(config, hadoopConf, metaClient);
// Validate table properties
metaClient.validateTableProperties(config.getProps());
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 551b412ccb..3fbc3abbb8 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -126,6 +126,11 @@ public class HoodieFlinkWriteClient<T extends
HoodieRecordPayload> extends
return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
}
+ @Override
+ protected HoodieTable createTable(HoodieWriteConfig config, Configuration
hadoopConf, HoodieTableMetaClient metaClient) {
+ return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context,
metaClient);
+ }
+
@Override
public List<HoodieRecord<T>> filterExists(List<HoodieRecord<T>>
hoodieRecords) {
// Create a Hoodie table which encapsulated the commits and files visible
@@ -291,10 +296,14 @@ public class HoodieFlinkWriteClient<T extends
HoodieRecordPayload> extends
HoodieFlinkTable<?> table = getHoodieTable();
if (config.isMetadataTableEnabled()) {
// initialize the metadata table path
- try (HoodieBackedTableMetadataWriter metadataWriter =
initMetadataWriter()) {
- // do nothing
+ // guard the metadata writer with concurrent lock
+ try {
+ this.txnManager.getLockManager().lock();
+ initMetadataWriter().close();
} catch (Exception e) {
throw new HoodieException("Failed to initialize metadata table", e);
+ } finally {
+ this.txnManager.getLockManager().unlock();
}
// clean the obsolete index stats
table.deleteMetadataIndexIfNecessary();
@@ -478,16 +487,13 @@ public class HoodieFlinkWriteClient<T extends
HoodieRecordPayload> extends
}
@Override
- protected HoodieTable doInitTable(HoodieTableMetaClient metaClient,
Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
- // Create a Hoodie table which encapsulated the commits and files visible
- return getHoodieTable();
- }
-
- @Override
- protected void tryUpgrade(HoodieTableMetaClient metaClient, Option<String>
instantTime) {
+ protected void doInitTable(HoodieTableMetaClient metaClient, Option<String>
instantTime, boolean initialMetadataTableIfNecessary) {
// do nothing.
+
// flink executes the upgrade/downgrade once when initializing the first
instant on start up,
// no need to execute the upgrade/downgrade on each write in streaming.
+
+ // flink performs metadata table bootstrap on the coordinator when it
starts up.
}
public void completeTableService(
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
index b6951bc6b7..4bb631e643 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
@@ -92,6 +92,11 @@ public class HoodieJavaWriteClient<T extends
HoodieRecordPayload> extends
return HoodieJavaTable.create(config, context);
}
+ @Override
+ protected HoodieTable createTable(HoodieWriteConfig config, Configuration
hadoopConf, HoodieTableMetaClient metaClient) {
+ return HoodieJavaTable.create(config, context, metaClient);
+ }
+
@Override
public List<WriteStatus> upsert(List<HoodieRecord<T>> records,
String instantTime) {
@@ -228,13 +233,4 @@ public class HoodieJavaWriteClient<T extends
HoodieRecordPayload> extends
public HoodieWriteMetadata<List<WriteStatus>> cluster(final String
clusteringInstant, final boolean shouldComplete) {
throw new HoodieNotSupportedException("Cluster is not supported in
HoodieJavaClient");
}
-
- @Override
- protected HoodieTable doInitTable(HoodieTableMetaClient metaClient,
Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
- // new JavaUpgradeDowngrade(metaClient, config, context).run(metaClient,
HoodieTableVersion.current(), config, context, instantTime);
-
- // Create a Hoodie table which encapsulated the commits and files visible
- return HoodieJavaTable.create(config, (HoodieJavaEngineContext) context,
metaClient);
- }
-
}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
index 3c878cbc14..c33bf88e7e 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
@@ -51,7 +51,7 @@ public abstract class HoodieJavaTable<T extends
HoodieRecordPayload>
}
public static <T extends HoodieRecordPayload> HoodieJavaTable<T>
create(HoodieWriteConfig config,
-
HoodieJavaEngineContext context,
+
HoodieEngineContext context,
HoodieTableMetaClient metaClient) {
switch (metaClient.getTableType()) {
case COPY_ON_WRITE:
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index 1f9fcf3ef9..9d70d43b62 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -131,6 +131,11 @@ public class SparkRDDWriteClient<T extends
HoodieRecordPayload> extends
return HoodieSparkTable.create(config, context);
}
+ @Override
+ protected HoodieTable createTable(HoodieWriteConfig config, Configuration
hadoopConf, HoodieTableMetaClient metaClient) {
+ return HoodieSparkTable.create(config, context, metaClient);
+ }
+
@Override
public JavaRDD<HoodieRecord<T>> filterExists(JavaRDD<HoodieRecord<T>>
hoodieRecords) {
// Create a Hoodie table which encapsulated the commits and files visible
@@ -434,16 +439,11 @@ public class SparkRDDWriteClient<T extends
HoodieRecordPayload> extends
}
@Override
- protected HoodieTable doInitTable(HoodieTableMetaClient metaClient,
Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
- if (initialMetadataTableIfNecessary) {
- // Initialize Metadata Table to make sure it's bootstrapped _before_ the
operation,
- // if it didn't exist before
- // See https://issues.apache.org/jira/browse/HUDI-3343 for more details
- initializeMetadataTable(instantTime);
- }
-
- // Create a Hoodie table which encapsulated the commits and files visible
- return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context,
metaClient);
+ protected void initMetadataTable(Option<String> instantTime) {
+ // Initialize Metadata Table to make sure it's bootstrapped _before_ the
operation,
+ // if it didn't exist before
+ // See https://issues.apache.org/jira/browse/HUDI-3343 for more details
+ initializeMetadataTable(instantTime);
}
/**
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
index 66d51c9128..3719ed742b 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
@@ -19,7 +19,6 @@
package org.apache.hudi.table;
import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
@@ -60,11 +59,11 @@ public abstract class HoodieSparkTable<T extends
HoodieRecordPayload>
.setLayoutVersion(Option.of(new
TimelineLayoutVersion(config.getTimelineLayoutVersion())))
.setFileSystemRetryConfig(config.getFileSystemRetryConfig())
.setProperties(config.getProps()).build();
- return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context,
metaClient);
+ return HoodieSparkTable.create(config, context, metaClient);
}
public static <T extends HoodieRecordPayload> HoodieSparkTable<T>
create(HoodieWriteConfig config,
-
HoodieSparkEngineContext context,
+
HoodieEngineContext context,
HoodieTableMetaClient metaClient) {
HoodieSparkTable<T> hoodieSparkTable;
switch (metaClient.getTableType()) {