This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new dcd5a8182a1 [HUDI-7069] Optimize metaclient construction and include
table config options (#10048)
dcd5a8182a1 is described below
commit dcd5a8182a11faab8bfc1ce8aa7787fa590dd395
Author: majian <[email protected]>
AuthorDate: Wed Nov 15 16:10:15 2023 +0800
[HUDI-7069] Optimize metaclient construction and include table config
options (#10048)
---
.../apache/hudi/utilities/HoodieClusteringJob.java | 8 ++++----
.../org/apache/hudi/utilities/HoodieCompactor.java | 11 ++++++-----
.../hudi/utilities/multitable/ClusteringTask.java | 19 ++++++++++++++++++-
.../hudi/utilities/multitable/CompactionTask.java | 19 ++++++++++++++++++-
.../utilities/multitable/MultiTableServiceUtils.java | 16 ++++++++++++----
5 files changed, 58 insertions(+), 15 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
index d0c1535a43c..5cb6b9bbb15 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
@@ -56,17 +56,17 @@ public class HoodieClusteringJob {
private HoodieTableMetaClient metaClient;
public HoodieClusteringJob(JavaSparkContext jsc, Config cfg) {
- this(jsc, cfg, UtilHelpers.buildProperties(jsc.hadoopConfiguration(),
cfg.propsFilePath, cfg.configs));
+ this(jsc, cfg, UtilHelpers.buildProperties(jsc.hadoopConfiguration(),
cfg.propsFilePath, cfg.configs),
+ UtilHelpers.createMetaClient(jsc, cfg.basePath, true));
}
- public HoodieClusteringJob(JavaSparkContext jsc, Config cfg, TypedProperties
props) {
+ public HoodieClusteringJob(JavaSparkContext jsc, Config cfg, TypedProperties
props, HoodieTableMetaClient metaClient) {
this.cfg = cfg;
this.jsc = jsc;
this.props = props;
- this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
+ this.metaClient = metaClient;
// Disable async cleaning, will trigger synchronous cleaning manually.
this.props.put(HoodieCleanConfig.ASYNC_CLEAN.key(), false);
- this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
if (this.metaClient.getTableConfig().isMetadataTableAvailable()) {
// add default lock config options if MDT is enabled.
UtilHelpers.addLockOptions(cfg.basePath, this.props);
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
index c1464c2fe2c..9b03cb7a724 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
@@ -56,17 +56,18 @@ public class HoodieCompactor {
private transient FileSystem fs;
private TypedProperties props;
private final JavaSparkContext jsc;
- private final HoodieTableMetaClient metaClient;
+ private HoodieTableMetaClient metaClient;
public HoodieCompactor(JavaSparkContext jsc, Config cfg) {
- this(jsc, cfg, UtilHelpers.buildProperties(jsc.hadoopConfiguration(),
cfg.propsFilePath, cfg.configs));
+ this(jsc, cfg, UtilHelpers.buildProperties(jsc.hadoopConfiguration(),
cfg.propsFilePath, cfg.configs),
+ UtilHelpers.createMetaClient(jsc, cfg.basePath, true));
}
- public HoodieCompactor(JavaSparkContext jsc, Config cfg, TypedProperties
props) {
+ public HoodieCompactor(JavaSparkContext jsc, Config cfg, TypedProperties
props, HoodieTableMetaClient metaClient) {
this.cfg = cfg;
this.jsc = jsc;
this.props = props;
- this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
+ this.metaClient = metaClient;
// Disable async cleaning, will trigger synchronous cleaning manually.
this.props.put(HoodieCleanConfig.ASYNC_CLEAN.key(), false);
if (this.metaClient.getTableConfig().isMetadataTableAvailable()) {
@@ -256,7 +257,7 @@ public class HoodieCompactor {
// If no compaction instant is provided by --instant-time, find the
earliest scheduled compaction
// instant from the active timeline
if (StringUtils.isNullOrEmpty(cfg.compactionInstantTime)) {
- HoodieTableMetaClient metaClient = UtilHelpers.createMetaClient(jsc,
cfg.basePath, true);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
Option<HoodieInstant> firstCompactionInstant =
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
if (firstCompactionInstant.isPresent()) {
cfg.compactionInstantTime =
firstCompactionInstant.get().getTimestamp();
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/ClusteringTask.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/ClusteringTask.java
index 532c59e9725..e20d71e8cc9 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/ClusteringTask.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/ClusteringTask.java
@@ -20,6 +20,7 @@
package org.apache.hudi.utilities.multitable;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.utilities.HoodieClusteringJob;
import org.apache.spark.api.java.JavaSparkContext;
@@ -43,13 +44,18 @@ class ClusteringTask extends TableServiceTask {
*/
private String clusteringMode;
+ /**
+ * Meta Client.
+ */
+ private HoodieTableMetaClient metaClient;
+
@Override
void run() {
HoodieClusteringJob.Config clusteringConfig = new
HoodieClusteringJob.Config();
clusteringConfig.basePath = basePath;
clusteringConfig.parallelism = parallelism;
clusteringConfig.runningMode = clusteringMode;
- new HoodieClusteringJob(jsc, clusteringConfig, props).cluster(retry);
+ new HoodieClusteringJob(jsc, clusteringConfig, props,
metaClient).cluster(retry);
}
/**
@@ -98,6 +104,11 @@ class ClusteringTask extends TableServiceTask {
*/
private int retry;
+ /**
+ * Meta Client.
+ */
+ private HoodieTableMetaClient metaClient;
+
private Builder() {
}
@@ -131,6 +142,11 @@ class ClusteringTask extends TableServiceTask {
return this;
}
+ public Builder withMetaclient(HoodieTableMetaClient metaClient) {
+ this.metaClient = metaClient;
+ return this;
+ }
+
public ClusteringTask build() {
ClusteringTask clusteringTask = new ClusteringTask();
clusteringTask.jsc = this.jsc;
@@ -139,6 +155,7 @@ class ClusteringTask extends TableServiceTask {
clusteringTask.retry = this.retry;
clusteringTask.basePath = this.basePath;
clusteringTask.props = this.props;
+ clusteringTask.metaClient = this.metaClient;
return clusteringTask;
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/CompactionTask.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/CompactionTask.java
index 4ee3e14a661..25b80e7cd45 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/CompactionTask.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/CompactionTask.java
@@ -20,6 +20,7 @@
package org.apache.hudi.utilities.multitable;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.utilities.HoodieCompactor;
import org.apache.spark.api.java.JavaSparkContext;
@@ -48,6 +49,11 @@ class CompactionTask extends TableServiceTask {
*/
private int parallelism;
+ /**
+ * Meta Client.
+ */
+ private HoodieTableMetaClient metaClient;
+
@Override
void run() {
HoodieCompactor.Config compactionCfg = new HoodieCompactor.Config();
@@ -56,7 +62,7 @@ class CompactionTask extends TableServiceTask {
compactionCfg.runningMode = compactionRunningMode;
compactionCfg.parallelism = parallelism;
compactionCfg.retry = retry;
- new HoodieCompactor(jsc, compactionCfg, props).compact(retry);
+ new HoodieCompactor(jsc, compactionCfg, props, metaClient).compact(retry);
}
/**
@@ -109,6 +115,11 @@ class CompactionTask extends TableServiceTask {
*/
private JavaSparkContext jsc;
+ /**
+ * Meta Client.
+ */
+ private HoodieTableMetaClient metaClient;
+
public Builder withProps(TypedProperties props) {
this.props = props;
return this;
@@ -144,6 +155,11 @@ class CompactionTask extends TableServiceTask {
return this;
}
+ public Builder withMetaclient(HoodieTableMetaClient metaClient) {
+ this.metaClient = metaClient;
+ return this;
+ }
+
public CompactionTask build() {
CompactionTask compactionTask = new CompactionTask();
compactionTask.basePath = this.basePath;
@@ -153,6 +169,7 @@ class CompactionTask extends TableServiceTask {
compactionTask.compactionStrategyName = this.compactionStrategyName;
compactionTask.retry = this.retry;
compactionTask.props = this.props;
+ compactionTask.metaClient = this.metaClient;
return compactionTask;
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java
index f9a761ea6b8..f600db65733 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java
@@ -22,9 +22,11 @@ package org.apache.hudi.utilities.multitable;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.TableNotFoundException;
+import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -166,6 +168,10 @@ public class MultiTableServiceUtils {
HoodieMultiTableServicesMain.Config cfg,
TypedProperties
props) {
TableServicePipeline pipeline = new TableServicePipeline();
+ HoodieTableMetaClient metaClient = UtilHelpers.createMetaClient(jsc,
basePath, true);
+ TypedProperties propsWithTableConfig = new
TypedProperties(metaClient.getTableConfig().getProps());
+ propsWithTableConfig.putAll(props);
+
if (cfg.enableCompaction) {
pipeline.add(CompactionTask.newBuilder()
.withJsc(jsc)
@@ -173,8 +179,9 @@ public class MultiTableServiceUtils {
.withParallelism(cfg.parallelism)
.withCompactionRunningMode(cfg.compactionRunningMode)
.withCompactionStrategyName(cfg.compactionStrategyClassName)
- .withProps(props)
+ .withProps(propsWithTableConfig)
.withRetry(cfg.retry)
+ .withMetaclient(metaClient)
.build());
}
if (cfg.enableClustering) {
@@ -183,8 +190,9 @@ public class MultiTableServiceUtils {
.withJsc(jsc)
.withParallelism(cfg.parallelism)
.withClusteringRunningMode(cfg.clusteringRunningMode)
- .withProps(props)
+ .withProps(propsWithTableConfig)
.withRetry(cfg.retry)
+ .withMetaclient(metaClient)
.build());
}
if (cfg.enableClean) {
@@ -192,14 +200,14 @@ public class MultiTableServiceUtils {
.withBasePath(basePath)
.withJsc(jsc)
.withRetry(cfg.retry)
- .withProps(props)
+ .withProps(propsWithTableConfig)
.build());
}
if (cfg.enableArchive) {
pipeline.add(ArchiveTask.newBuilder()
.withBasePath(basePath)
.withJsc(jsc)
- .withProps(props)
+ .withProps(propsWithTableConfig)
.withRetry(cfg.retry)
.build());
}