This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch release-0.12.2-blockers-candidate
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to 
refs/heads/release-0.12.2-blockers-candidate by this push:
     new 211af1a4fd [HUDI-5414] No need to guard the table initialization by 
lock for HoodieFlinkWriteClient (#7509) (#7522)
211af1a4fd is described below

commit 211af1a4fd76ce84ce80f4d1b2befe5fc9954888
Author: Danny Chan <[email protected]>
AuthorDate: Wed Dec 21 12:10:34 2022 +0800

    [HUDI-5414] No need to guard the table initialization by lock for 
HoodieFlinkWriteClient (#7509) (#7522)
    
    Different with other write clients, HoodieFlinkWriteClient invokes the 
dataset writing methods(#upsert or #insert)
    for each batch of new data set in the long running task. In current impl, a 
engine-specific hoodie table would be created before performing
    these actions, and before the table creation, some table bootstrapping 
operations are performed(such as table upgrade/downgrade, the metadata table
    bootstrap). These bootstrapping operations are guarded by a trasanction 
lock.
    
    In Flink, these bootstrapping operations can be avoided because they are 
all performed only once on the coordinator.
    
    The changes:
    
    - Make BaseHoodieWriteClient#doInitTable non abstract, it now only performs 
the bootstrapping operations
    - Add a default impl BaseHoodieWriteClient#initMetadataTable for metadata 
table bootstrap specifically
    - Add a new abstract method for creating engine-specific hoodie table
    
    (cherry picked from commit fd62a1413e74de686935672aec812aacd5c43a63)
---
 .../apache/hudi/client/BaseHoodieWriteClient.java  | 49 ++++++++++++++--------
 .../apache/hudi/client/HoodieFlinkWriteClient.java | 24 +++++++----
 .../apache/hudi/client/HoodieJavaWriteClient.java  | 14 +++----
 .../org/apache/hudi/table/HoodieJavaTable.java     |  2 +-
 .../apache/hudi/client/SparkRDDWriteClient.java    | 20 ++++-----
 .../org/apache/hudi/table/HoodieSparkTable.java    |  5 +--
 6 files changed, 64 insertions(+), 50 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index b4958f5692..258894fd79 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -317,6 +317,8 @@ public abstract class BaseHoodieWriteClient<T extends 
HoodieRecordPayload, I, K,
 
   protected abstract HoodieTable<T, I, K, O> createTable(HoodieWriteConfig 
config, Configuration hadoopConf);
 
+  protected abstract HoodieTable<T, I, K, O> createTable(HoodieWriteConfig 
config, Configuration hadoopConf, HoodieTableMetaClient metaClient);
+
   void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, 
String actionType) {
     if (writeTimer != null) {
       long durationInMs = metrics.getDurationInMs(writeTimer.stop());
@@ -1425,17 +1427,38 @@ public abstract class BaseHoodieWriteClient<T extends 
HoodieRecordPayload, I, K,
   }
 
   /**
-   * Instantiates engine-specific instance of {@link HoodieTable} as well as 
performs necessary
-   * bootstrapping operations (for ex, validating whether Metadata Table has 
to be bootstrapped)
+   * Performs necessary bootstrapping operations (for ex, validating whether 
Metadata Table has to be bootstrapped).
    *
-   * NOTE: THIS OPERATION IS EXECUTED UNDER LOCK, THEREFORE SHOULD AVOID ANY 
OPERATIONS
-   *       NOT REQUIRING EXTERNAL SYNCHRONIZATION
+   * <p>NOTE: THIS OPERATION IS EXECUTED UNDER LOCK, THEREFORE SHOULD AVOID 
ANY OPERATIONS
+   *          NOT REQUIRING EXTERNAL SYNCHRONIZATION
    *
    * @param metaClient instance of {@link HoodieTableMetaClient}
    * @param instantTime current inflight instant time
-   * @return instantiated {@link HoodieTable}
    */
-  protected abstract HoodieTable doInitTable(HoodieTableMetaClient metaClient, 
Option<String> instantTime, boolean initialMetadataTableIfNecessary);
+  protected void doInitTable(HoodieTableMetaClient metaClient, Option<String> 
instantTime, boolean initialMetadataTableIfNecessary) {
+    Option<HoodieInstant> ownerInstant = Option.empty();
+    if (instantTime.isPresent()) {
+      ownerInstant = Option.of(new HoodieInstant(true, 
CommitUtils.getCommitActionType(operationType, metaClient.getTableType()), 
instantTime.get()));
+    }
+    this.txnManager.beginTransaction(ownerInstant, Option.empty());
+    try {
+      tryUpgrade(metaClient, instantTime);
+      if (initialMetadataTableIfNecessary) {
+        initMetadataTable(instantTime);
+      }
+    } finally {
+      this.txnManager.endTransaction(ownerInstant);
+    }
+  }
+
+  /**
+   * Bootstrap the metadata table.
+   *
+   * @param instantTime current inflight instant time
+   */
+  protected void initMetadataTable(Option<String> instantTime) {
+    // by default do nothing.
+  }
 
   /**
    * Instantiates and initializes instance of {@link HoodieTable}, performing 
crucial bootstrapping
@@ -1457,18 +1480,8 @@ public abstract class BaseHoodieWriteClient<T extends 
HoodieRecordPayload, I, K,
       setWriteSchemaForDeletes(metaClient);
     }
 
-    HoodieTable table;
-    Option<HoodieInstant> ownerInstant = Option.empty();
-    if (instantTime.isPresent()) {
-      ownerInstant = Option.of(new HoodieInstant(true, 
CommitUtils.getCommitActionType(operationType, metaClient.getTableType()), 
instantTime.get()));
-    }
-    this.txnManager.beginTransaction(ownerInstant, Option.empty());
-    try {
-      tryUpgrade(metaClient, instantTime);
-      table = doInitTable(metaClient, instantTime, 
initialMetadataTableIfNecessary);
-    } finally {
-      this.txnManager.endTransaction(ownerInstant);
-    }
+    doInitTable(metaClient, instantTime, initialMetadataTableIfNecessary);
+    HoodieTable table = createTable(config, hadoopConf, metaClient);
 
     // Validate table properties
     metaClient.validateTableProperties(config.getProps());
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 551b412ccb..3fbc3abbb8 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -126,6 +126,11 @@ public class HoodieFlinkWriteClient<T extends 
HoodieRecordPayload> extends
     return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
   }
 
+  @Override
+  protected HoodieTable createTable(HoodieWriteConfig config, Configuration 
hadoopConf, HoodieTableMetaClient metaClient) {
+    return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context, 
metaClient);
+  }
+
   @Override
   public List<HoodieRecord<T>> filterExists(List<HoodieRecord<T>> 
hoodieRecords) {
     // Create a Hoodie table which encapsulated the commits and files visible
@@ -291,10 +296,14 @@ public class HoodieFlinkWriteClient<T extends 
HoodieRecordPayload> extends
     HoodieFlinkTable<?> table = getHoodieTable();
     if (config.isMetadataTableEnabled()) {
       // initialize the metadata table path
-      try (HoodieBackedTableMetadataWriter metadataWriter = 
initMetadataWriter()) {
-        // do nothing
+      // guard the metadata writer with concurrent lock
+      try {
+        this.txnManager.getLockManager().lock();
+        initMetadataWriter().close();
       } catch (Exception e) {
         throw new HoodieException("Failed to initialize metadata table", e);
+      } finally {
+        this.txnManager.getLockManager().unlock();
       }
       // clean the obsolete index stats
       table.deleteMetadataIndexIfNecessary();
@@ -478,16 +487,13 @@ public class HoodieFlinkWriteClient<T extends 
HoodieRecordPayload> extends
   }
 
   @Override
-  protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, 
Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
-    // Create a Hoodie table which encapsulated the commits and files visible
-    return getHoodieTable();
-  }
-
-  @Override
-  protected void tryUpgrade(HoodieTableMetaClient metaClient, Option<String> 
instantTime) {
+  protected void doInitTable(HoodieTableMetaClient metaClient, Option<String> 
instantTime, boolean initialMetadataTableIfNecessary) {
     // do nothing.
+
     // flink executes the upgrade/downgrade once when initializing the first 
instant on start up,
     // no need to execute the upgrade/downgrade on each write in streaming.
+
+    // flink performs metadata table bootstrap on the coordinator when it 
starts up.
   }
 
   public void completeTableService(
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
index b6951bc6b7..4bb631e643 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
@@ -92,6 +92,11 @@ public class HoodieJavaWriteClient<T extends 
HoodieRecordPayload> extends
     return HoodieJavaTable.create(config, context);
   }
 
+  @Override
+  protected HoodieTable createTable(HoodieWriteConfig config, Configuration 
hadoopConf, HoodieTableMetaClient metaClient) {
+    return HoodieJavaTable.create(config, context, metaClient);
+  }
+
   @Override
   public List<WriteStatus> upsert(List<HoodieRecord<T>> records,
                                   String instantTime) {
@@ -228,13 +233,4 @@ public class HoodieJavaWriteClient<T extends 
HoodieRecordPayload> extends
   public HoodieWriteMetadata<List<WriteStatus>> cluster(final String 
clusteringInstant, final boolean shouldComplete) {
     throw new HoodieNotSupportedException("Cluster is not supported in 
HoodieJavaClient");
   }
-
-  @Override
-  protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, 
Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
-    // new JavaUpgradeDowngrade(metaClient, config, context).run(metaClient, 
HoodieTableVersion.current(), config, context, instantTime);
-
-    // Create a Hoodie table which encapsulated the commits and files visible
-    return HoodieJavaTable.create(config, (HoodieJavaEngineContext) context, 
metaClient);
-  }
-
 }
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
index 3c878cbc14..c33bf88e7e 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
@@ -51,7 +51,7 @@ public abstract class HoodieJavaTable<T extends 
HoodieRecordPayload>
   }
 
   public static <T extends HoodieRecordPayload> HoodieJavaTable<T> 
create(HoodieWriteConfig config,
-                                                                          
HoodieJavaEngineContext context,
+                                                                          
HoodieEngineContext context,
                                                                           
HoodieTableMetaClient metaClient) {
     switch (metaClient.getTableType()) {
       case COPY_ON_WRITE:
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index 1f9fcf3ef9..9d70d43b62 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -131,6 +131,11 @@ public class SparkRDDWriteClient<T extends 
HoodieRecordPayload> extends
     return HoodieSparkTable.create(config, context);
   }
 
+  @Override
+  protected HoodieTable createTable(HoodieWriteConfig config, Configuration 
hadoopConf, HoodieTableMetaClient metaClient) {
+    return HoodieSparkTable.create(config, context, metaClient);
+  }
+
   @Override
   public JavaRDD<HoodieRecord<T>> filterExists(JavaRDD<HoodieRecord<T>> 
hoodieRecords) {
     // Create a Hoodie table which encapsulated the commits and files visible
@@ -434,16 +439,11 @@ public class SparkRDDWriteClient<T extends 
HoodieRecordPayload> extends
   }
 
   @Override
-  protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, 
Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
-    if (initialMetadataTableIfNecessary) {
-      // Initialize Metadata Table to make sure it's bootstrapped _before_ the 
operation,
-      // if it didn't exist before
-      // See https://issues.apache.org/jira/browse/HUDI-3343 for more details
-      initializeMetadataTable(instantTime);
-    }
-
-    // Create a Hoodie table which encapsulated the commits and files visible
-    return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, 
metaClient);
+  protected void initMetadataTable(Option<String> instantTime) {
+    // Initialize Metadata Table to make sure it's bootstrapped _before_ the 
operation,
+    // if it didn't exist before
+    // See https://issues.apache.org/jira/browse/HUDI-3343 for more details
+    initializeMetadataTable(instantTime);
   }
 
   /**
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
index 66d51c9128..3719ed742b 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
@@ -19,7 +19,6 @@
 package org.apache.hudi.table;
 
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieKey;
@@ -60,11 +59,11 @@ public abstract class HoodieSparkTable<T extends 
HoodieRecordPayload>
             .setLayoutVersion(Option.of(new 
TimelineLayoutVersion(config.getTimelineLayoutVersion())))
             .setFileSystemRetryConfig(config.getFileSystemRetryConfig())
             .setProperties(config.getProps()).build();
-    return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, 
metaClient);
+    return HoodieSparkTable.create(config, context, metaClient);
   }
 
   public static <T extends HoodieRecordPayload> HoodieSparkTable<T> 
create(HoodieWriteConfig config,
-                                                                           
HoodieSparkEngineContext context,
+                                                                           
HoodieEngineContext context,
                                                                            
HoodieTableMetaClient metaClient) {
     HoodieSparkTable<T> hoodieSparkTable;
     switch (metaClient.getTableType()) {

Reply via email to