This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new dcd5a8182a1 [HUDI-7069] Optimize metaclient construction and include 
table config options (#10048)
dcd5a8182a1 is described below

commit dcd5a8182a11faab8bfc1ce8aa7787fa590dd395
Author: majian <[email protected]>
AuthorDate: Wed Nov 15 16:10:15 2023 +0800

    [HUDI-7069] Optimize metaclient construction and include table config 
options (#10048)
---
 .../apache/hudi/utilities/HoodieClusteringJob.java    |  8 ++++----
 .../org/apache/hudi/utilities/HoodieCompactor.java    | 11 ++++++-----
 .../hudi/utilities/multitable/ClusteringTask.java     | 19 ++++++++++++++++++-
 .../hudi/utilities/multitable/CompactionTask.java     | 19 ++++++++++++++++++-
 .../utilities/multitable/MultiTableServiceUtils.java  | 16 ++++++++++++----
 5 files changed, 58 insertions(+), 15 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
index d0c1535a43c..5cb6b9bbb15 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
@@ -56,17 +56,17 @@ public class HoodieClusteringJob {
   private HoodieTableMetaClient metaClient;
 
   public HoodieClusteringJob(JavaSparkContext jsc, Config cfg) {
-    this(jsc, cfg, UtilHelpers.buildProperties(jsc.hadoopConfiguration(), 
cfg.propsFilePath, cfg.configs));
+    this(jsc, cfg, UtilHelpers.buildProperties(jsc.hadoopConfiguration(), 
cfg.propsFilePath, cfg.configs),
+        UtilHelpers.createMetaClient(jsc, cfg.basePath, true));
   }
 
-  public HoodieClusteringJob(JavaSparkContext jsc, Config cfg, TypedProperties 
props) {
+  public HoodieClusteringJob(JavaSparkContext jsc, Config cfg, TypedProperties 
props, HoodieTableMetaClient metaClient) {
     this.cfg = cfg;
     this.jsc = jsc;
     this.props = props;
-    this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
+    this.metaClient = metaClient;
     // Disable async cleaning, will trigger synchronous cleaning manually.
     this.props.put(HoodieCleanConfig.ASYNC_CLEAN.key(), false);
-    this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
     if (this.metaClient.getTableConfig().isMetadataTableAvailable()) {
       // add default lock config options if MDT is enabled.
       UtilHelpers.addLockOptions(cfg.basePath, this.props);
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
index c1464c2fe2c..9b03cb7a724 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
@@ -56,17 +56,18 @@ public class HoodieCompactor {
   private transient FileSystem fs;
   private TypedProperties props;
   private final JavaSparkContext jsc;
-  private final HoodieTableMetaClient metaClient;
+  private HoodieTableMetaClient metaClient;
 
   public HoodieCompactor(JavaSparkContext jsc, Config cfg) {
-    this(jsc, cfg, UtilHelpers.buildProperties(jsc.hadoopConfiguration(), 
cfg.propsFilePath, cfg.configs));
+    this(jsc, cfg, UtilHelpers.buildProperties(jsc.hadoopConfiguration(), 
cfg.propsFilePath, cfg.configs),
+        UtilHelpers.createMetaClient(jsc, cfg.basePath, true));
   }
 
-  public HoodieCompactor(JavaSparkContext jsc, Config cfg, TypedProperties 
props) {
+  public HoodieCompactor(JavaSparkContext jsc, Config cfg, TypedProperties 
props, HoodieTableMetaClient metaClient) {
     this.cfg = cfg;
     this.jsc = jsc;
     this.props = props;
-    this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
+    this.metaClient = metaClient;
     // Disable async cleaning, will trigger synchronous cleaning manually.
     this.props.put(HoodieCleanConfig.ASYNC_CLEAN.key(), false);
     if (this.metaClient.getTableConfig().isMetadataTableAvailable()) {
@@ -256,7 +257,7 @@ public class HoodieCompactor {
       // If no compaction instant is provided by --instant-time, find the 
earliest scheduled compaction
       // instant from the active timeline
       if (StringUtils.isNullOrEmpty(cfg.compactionInstantTime)) {
-        HoodieTableMetaClient metaClient = UtilHelpers.createMetaClient(jsc, 
cfg.basePath, true);
+        metaClient = HoodieTableMetaClient.reload(metaClient);
         Option<HoodieInstant> firstCompactionInstant = 
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
         if (firstCompactionInstant.isPresent()) {
           cfg.compactionInstantTime = 
firstCompactionInstant.get().getTimestamp();
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/ClusteringTask.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/ClusteringTask.java
index 532c59e9725..e20d71e8cc9 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/ClusteringTask.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/ClusteringTask.java
@@ -20,6 +20,7 @@
 package org.apache.hudi.utilities.multitable;
 
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.utilities.HoodieClusteringJob;
 
 import org.apache.spark.api.java.JavaSparkContext;
@@ -43,13 +44,18 @@ class ClusteringTask extends TableServiceTask {
    */
   private String clusteringMode;
 
+  /**
+   * Meta Client.
+   */
+  private HoodieTableMetaClient metaClient;
+
   @Override
   void run() {
     HoodieClusteringJob.Config clusteringConfig = new 
HoodieClusteringJob.Config();
     clusteringConfig.basePath = basePath;
     clusteringConfig.parallelism = parallelism;
     clusteringConfig.runningMode = clusteringMode;
-    new HoodieClusteringJob(jsc, clusteringConfig, props).cluster(retry);
+    new HoodieClusteringJob(jsc, clusteringConfig, props, 
metaClient).cluster(retry);
   }
 
   /**
@@ -98,6 +104,11 @@ class ClusteringTask extends TableServiceTask {
      */
     private int retry;
 
+    /**
+     * Meta Client.
+     */
+    private HoodieTableMetaClient metaClient;
+
     private Builder() {
     }
 
@@ -131,6 +142,11 @@ class ClusteringTask extends TableServiceTask {
       return this;
     }
 
+    public Builder withMetaclient(HoodieTableMetaClient metaClient) {
+      this.metaClient = metaClient;
+      return this;
+    }
+
     public ClusteringTask build() {
       ClusteringTask clusteringTask = new ClusteringTask();
       clusteringTask.jsc = this.jsc;
@@ -139,6 +155,7 @@ class ClusteringTask extends TableServiceTask {
       clusteringTask.retry = this.retry;
       clusteringTask.basePath = this.basePath;
       clusteringTask.props = this.props;
+      clusteringTask.metaClient = this.metaClient;
       return clusteringTask;
     }
   }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/CompactionTask.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/CompactionTask.java
index 4ee3e14a661..25b80e7cd45 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/CompactionTask.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/CompactionTask.java
@@ -20,6 +20,7 @@
 package org.apache.hudi.utilities.multitable;
 
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.utilities.HoodieCompactor;
 
 import org.apache.spark.api.java.JavaSparkContext;
@@ -48,6 +49,11 @@ class CompactionTask extends TableServiceTask {
    */
   private int parallelism;
 
+  /**
+   * Meta Client.
+   */
+  private HoodieTableMetaClient metaClient;
+
   @Override
   void run() {
     HoodieCompactor.Config compactionCfg = new HoodieCompactor.Config();
@@ -56,7 +62,7 @@ class CompactionTask extends TableServiceTask {
     compactionCfg.runningMode = compactionRunningMode;
     compactionCfg.parallelism = parallelism;
     compactionCfg.retry = retry;
-    new HoodieCompactor(jsc, compactionCfg, props).compact(retry);
+    new HoodieCompactor(jsc, compactionCfg, props, metaClient).compact(retry);
   }
 
   /**
@@ -109,6 +115,11 @@ class CompactionTask extends TableServiceTask {
      */
     private JavaSparkContext jsc;
 
+    /**
+     * Meta Client.
+     */
+    private HoodieTableMetaClient metaClient;
+
     public Builder withProps(TypedProperties props) {
       this.props = props;
       return this;
@@ -144,6 +155,11 @@ class CompactionTask extends TableServiceTask {
       return this;
     }
 
+    public Builder withMetaclient(HoodieTableMetaClient metaClient) {
+      this.metaClient = metaClient;
+      return this;
+    }
+
     public CompactionTask build() {
       CompactionTask compactionTask = new CompactionTask();
       compactionTask.basePath = this.basePath;
@@ -153,6 +169,7 @@ class CompactionTask extends TableServiceTask {
       compactionTask.compactionStrategyName = this.compactionStrategyName;
       compactionTask.retry = this.retry;
       compactionTask.props = this.props;
+      compactionTask.metaClient = this.metaClient;
       return compactionTask;
     }
   }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java
index f9a761ea6b8..f600db65733 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java
@@ -22,9 +22,11 @@ package org.apache.hudi.utilities.multitable;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.TableNotFoundException;
+import org.apache.hudi.utilities.UtilHelpers;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -166,6 +168,10 @@ public class MultiTableServiceUtils {
                                                                
HoodieMultiTableServicesMain.Config cfg,
                                                                TypedProperties 
props) {
     TableServicePipeline pipeline = new TableServicePipeline();
+    HoodieTableMetaClient metaClient = UtilHelpers.createMetaClient(jsc, 
basePath, true);
+    TypedProperties propsWithTableConfig = new 
TypedProperties(metaClient.getTableConfig().getProps());
+    propsWithTableConfig.putAll(props);
+
     if (cfg.enableCompaction) {
       pipeline.add(CompactionTask.newBuilder()
           .withJsc(jsc)
@@ -173,8 +179,9 @@ public class MultiTableServiceUtils {
           .withParallelism(cfg.parallelism)
           .withCompactionRunningMode(cfg.compactionRunningMode)
           .withCompactionStrategyName(cfg.compactionStrategyClassName)
-          .withProps(props)
+          .withProps(propsWithTableConfig)
           .withRetry(cfg.retry)
+          .withMetaclient(metaClient)
           .build());
     }
     if (cfg.enableClustering) {
@@ -183,8 +190,9 @@ public class MultiTableServiceUtils {
           .withJsc(jsc)
           .withParallelism(cfg.parallelism)
           .withClusteringRunningMode(cfg.clusteringRunningMode)
-          .withProps(props)
+          .withProps(propsWithTableConfig)
           .withRetry(cfg.retry)
+          .withMetaclient(metaClient)
           .build());
     }
     if (cfg.enableClean) {
@@ -192,14 +200,14 @@ public class MultiTableServiceUtils {
           .withBasePath(basePath)
           .withJsc(jsc)
           .withRetry(cfg.retry)
-          .withProps(props)
+          .withProps(propsWithTableConfig)
           .build());
     }
     if (cfg.enableArchive) {
       pipeline.add(ArchiveTask.newBuilder()
           .withBasePath(basePath)
           .withJsc(jsc)
-          .withProps(props)
+          .withProps(propsWithTableConfig)
           .withRetry(cfg.retry)
           .build());
     }

Reply via email to