This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2b34d3fddbd [HUDI-6646] Add default lock provider for spark offline
compaction and clustering service (#9365)
2b34d3fddbd is described below
commit 2b34d3fddbd757a9ff77dd0ad2c67f0d04b03e4f
Author: Danny Chan <[email protected]>
AuthorDate: Sun Aug 6 11:43:03 2023 +0800
[HUDI-6646] Add default lock provider for spark offline compaction and
clustering service (#9365)
The default in progress lock provider does not work well for multi-engine
interations.
Config a file system lock provider which shares the same lock path for
engines.
---
.../lock/FileSystemBasedLockProvider.java | 25 ++++++++++++
.../org/apache/hudi/util/FlinkWriteClients.java | 6 +--
.../java/org/apache/hudi/util/StreamerUtil.java | 8 ----
.../scala/org/apache/hudi/HoodieCLIUtils.scala | 9 ++++-
.../procedures/RunClusteringProcedure.scala | 9 ++++-
.../procedures/RunCompactionProcedure.scala | 8 ++++
.../apache/hudi/utilities/HoodieClusteringJob.java | 4 ++
.../org/apache/hudi/utilities/HoodieCompactor.java | 4 ++
.../org/apache/hudi/utilities/UtilHelpers.java | 8 ++++
.../org/apache/hudi/utilities/TestUtilHelpers.java | 44 ++++++++++++++++++++++
10 files changed, 110 insertions(+), 15 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java
index dc0f16ecb40..da7e71a2058 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java
@@ -21,6 +21,7 @@ package org.apache.hudi.client.transaction.lock;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.StorageSchemes;
import org.apache.hudi.common.lock.LockProvider;
@@ -29,6 +30,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieLockException;
@@ -50,6 +52,7 @@ import java.util.concurrent.TimeUnit;
import static
org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_EXPIRE_PROP_KEY;
import static
org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY;
+import static
org.apache.hudi.common.table.HoodieTableMetaClient.AUXILIARYFOLDER_NAME;
/**
* A FileSystem based lock. This {@link LockProvider} implementation allows to
lock table operations
@@ -201,4 +204,26 @@ public class FileSystemBasedLockProvider implements
LockProvider<String>, Serial
|| config.getConfig().getString(HoodieWriteConfig.BASE_PATH.key(),
null) != null);
ValidationUtils.checkArgument(config.getConfig().getInteger(FILESYSTEM_LOCK_EXPIRE_PROP_KEY)
>= 0);
}
+
+ /**
+ * Returns a filesystem based lock config with given table path.
+ */
+ public static TypedProperties getLockConfig(String tablePath) {
+ TypedProperties props = new TypedProperties();
+ props.put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
FileSystemBasedLockProvider.class.getName());
+ props.put(HoodieLockConfig.LOCK_ACQUIRE_WAIT_TIMEOUT_MS.key(), "2000");
+ props.put(HoodieLockConfig.FILESYSTEM_LOCK_EXPIRE.key(), "1");
+ props.put(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.key(), "30");
+ props.put(HoodieLockConfig.FILESYSTEM_LOCK_PATH.key(),
defaultLockPath(tablePath));
+ return props;
+ }
+
+ /**
+ * Returns the default lock file root path.
+ *
+ * <p>IMPORTANT: this path should be shared especially when there is engine
cooperation.
+ */
+ private static String defaultLockPath(String tablePath) {
+ return tablePath + Path.SEPARATOR + AUXILIARYFOLDER_NAME;
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
index 3e84e8493e3..6d8b0d0a7d6 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
@@ -227,12 +227,8 @@ public class FlinkWriteClients {
if (OptionsResolver.isLockRequired(conf) &&
!conf.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key())) {
// configure the fs lock provider by default
builder.withLockConfig(HoodieLockConfig.newBuilder()
+
.fromProperties(FileSystemBasedLockProvider.getLockConfig(conf.getString(FlinkOptions.PATH)))
.withConflictResolutionStrategy(OptionsResolver.getConflictResolutionStrategy(conf))
- .withLockProvider(FileSystemBasedLockProvider.class)
- .withLockWaitTimeInMillis(2000L) // 2s
- .withFileSystemLockExpire(1) // 1 minute
- .withClientNumRetries(30)
- .withFileSystemLockPath(StreamerUtil.getAuxiliaryPath(conf))
.build());
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 5342d7b4192..ce44ba04060 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -74,7 +74,6 @@ import static
org.apache.hudi.common.model.HoodieFileFormat.HOODIE_LOG;
import static org.apache.hudi.common.model.HoodieFileFormat.ORC;
import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
-import static
org.apache.hudi.common.table.HoodieTableMetaClient.AUXILIARYFOLDER_NAME;
/**
* Utilities for Flink stream read and write.
@@ -472,11 +471,4 @@ public class StreamerUtil {
? !instant.getAction().equals(HoodieTimeline.COMMIT_ACTION) // not a
compaction
: !ClusteringUtil.isClusteringInstant(instant, timeline); // not a
clustering
}
-
- /**
- * Returns the auxiliary path.
- */
- public static String getAuxiliaryPath(Configuration conf) {
- return conf.getString(FlinkOptions.PATH) + Path.SEPARATOR +
AUXILIARYFOLDER_NAME;
- }
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
index fe2b09ed426..2c4fcc8e315 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
@@ -21,9 +21,9 @@ package org.apache.hudi
import org.apache.hudi.avro.model.HoodieClusteringGroup
import org.apache.hudi.client.SparkRDDWriteClient
+import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider
import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.common.util.StringUtils
-
import org.apache.spark.SparkException
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.SparkSession
@@ -106,4 +106,11 @@ object HoodieCLIUtils {
.map(pair => pair.get(0) -> pair.get(1))
.toMap
}
+
+ def getLockOptions(tablePath: String): Map[String, String] = {
+ val props = FileSystemBasedLockProvider.getLockConfig(tablePath)
+ props.stringPropertyNames.asScala
+ .map(key => key -> props.getString(key))
+ .toMap
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
index f88cdcfc8ee..4394095d9a7 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
@@ -23,8 +23,8 @@ import
org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeli
import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.common.util.ValidationUtils.checkArgument
import org.apache.hudi.common.util.{ClusteringUtils, HoodieTimer, Option =>
HOption}
-import org.apache.hudi.config.HoodieClusteringConfig
import org.apache.hudi.config.HoodieClusteringConfig.LayoutOptimizationStrategy
+import org.apache.hudi.config.{HoodieClusteringConfig, HoodieLockConfig}
import org.apache.hudi.exception.HoodieClusteringException
import org.apache.hudi.{AvroConversionUtils, HoodieCLIUtils, HoodieFileIndex}
@@ -140,6 +140,13 @@ class RunClusteringProcedure extends BaseProcedure
logInfo("No options")
}
+ if (metaClient.getTableConfig.isMetadataTableAvailable) {
+ if (!confs.contains(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key)) {
+ confs = confs ++ HoodieCLIUtils.getLockOptions(basePath)
+ logInfo("Auto config filesystem lock provider for metadata table")
+ }
+ }
+
val pendingClusteringInstants =
ClusteringUtils.getAllPendingClusteringPlans(metaClient)
.iterator().asScala.map(_.getLeft.getTimestamp).toSeq.sortBy(f => f)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala
index c91686b84a9..338262dca95 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala
@@ -22,6 +22,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline,
HoodieTimeline}
import org.apache.hudi.common.util.{CompactionUtils, HoodieTimer, Option =>
HOption}
+import org.apache.hudi.config.HoodieLockConfig
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.{HoodieCLIUtils, SparkAdapterSupport}
@@ -83,6 +84,13 @@ class RunCompactionProcedure extends BaseProcedure with
ProcedureBuilder with Sp
val basePath = getBasePath(tableName, tablePath)
val metaClient =
HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
+ if (metaClient.getTableConfig.isMetadataTableAvailable) {
+ if (!confs.contains(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key)) {
+ confs = confs ++ HoodieCLIUtils.getLockOptions(basePath)
+ logInfo("Auto config filesystem lock provider for metadata table")
+ }
+ }
+
val pendingCompactionInstants =
metaClient.getActiveTimeline.getWriteTimeline.getInstants.iterator().asScala
.filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION)
.map(_.getTimestamp)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
index 15273d701a8..9abeafb88fd 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
@@ -65,6 +65,10 @@ public class HoodieClusteringJob {
// Disable async cleaning, will trigger synchronous cleaning manually.
this.props.put(HoodieCleanConfig.ASYNC_CLEAN.key(), false);
this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
+ if (this.metaClient.getTableConfig().isMetadataTableAvailable()) {
+ // add default lock config options if MDT is enabled.
+ UtilHelpers.addLockOptions(cfg.basePath, this.props);
+ }
}
private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc,
Config cfg) {
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
index 603502affb6..e7213f93a55 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
@@ -69,6 +69,10 @@ public class HoodieCompactor {
// Disable async cleaning, will trigger synchronous cleaning manually.
this.props.put(HoodieCleanConfig.ASYNC_CLEAN.key(), false);
this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
+ if (this.metaClient.getTableConfig().isMetadataTableAvailable()) {
+ // add default lock config options if MDT is enabled.
+ UtilHelpers.addLockOptions(cfg.basePath, this.props);
+ }
}
private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc,
Config cfg) {
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index 35a5c9fcb47..b74a43e2430 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -23,6 +23,7 @@ import org.apache.hudi.SparkJdbcUtils;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.EngineType;
@@ -41,6 +42,7 @@ import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
@@ -578,6 +580,12 @@ public class UtilHelpers {
.build();
}
+ public static void addLockOptions(String basePath, TypedProperties props) {
+ if (!props.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key())) {
+ props.putAll(FileSystemBasedLockProvider.getLockConfig(basePath));
+ }
+ }
+
@FunctionalInterface
public interface CheckedSupplier<T> {
T get() throws Throwable;
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestUtilHelpers.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestUtilHelpers.java
new file mode 100644
index 00000000000..c7d1235424b
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestUtilHelpers.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.config.HoodieLockConfig;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Test cases for {@link UtilHelpers}.
+ */
+public class TestUtilHelpers {
+ @Test
+ void testAddLockOptions() {
+ TypedProperties props1 = new TypedProperties();
+ UtilHelpers.addLockOptions("path1", props1);
+ assertEquals(FileSystemBasedLockProvider.class.getName(),
props1.getString(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key()));
+
+ TypedProperties props2 = new TypedProperties();
+ props2.put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), "Dummy");
+ UtilHelpers.addLockOptions("path2", props2);
+ assertEquals(1, props2.size(), "Should not add lock options if the lock
provider is already there.");
+ }
+}