This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b34d3fddbd [HUDI-6646] Add default lock provider for spark offline 
compaction and clustering service (#9365)
2b34d3fddbd is described below

commit 2b34d3fddbd757a9ff77dd0ad2c67f0d04b03e4f
Author: Danny Chan <[email protected]>
AuthorDate: Sun Aug 6 11:43:03 2023 +0800

    [HUDI-6646] Add default lock provider for spark offline compaction and 
clustering service (#9365)
    
    The default in progress lock provider does not work well for multi-engine 
interations.
    Config a file system lock provider which shares the same lock path for 
engines.
---
 .../lock/FileSystemBasedLockProvider.java          | 25 ++++++++++++
 .../org/apache/hudi/util/FlinkWriteClients.java    |  6 +--
 .../java/org/apache/hudi/util/StreamerUtil.java    |  8 ----
 .../scala/org/apache/hudi/HoodieCLIUtils.scala     |  9 ++++-
 .../procedures/RunClusteringProcedure.scala        |  9 ++++-
 .../procedures/RunCompactionProcedure.scala        |  8 ++++
 .../apache/hudi/utilities/HoodieClusteringJob.java |  4 ++
 .../org/apache/hudi/utilities/HoodieCompactor.java |  4 ++
 .../org/apache/hudi/utilities/UtilHelpers.java     |  8 ++++
 .../org/apache/hudi/utilities/TestUtilHelpers.java | 44 ++++++++++++++++++++++
 10 files changed, 110 insertions(+), 15 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java
index dc0f16ecb40..da7e71a2058 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java
@@ -21,6 +21,7 @@ package org.apache.hudi.client.transaction.lock;
 
 import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.fs.StorageSchemes;
 import org.apache.hudi.common.lock.LockProvider;
@@ -29,6 +30,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieLockConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieLockException;
@@ -50,6 +52,7 @@ import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_EXPIRE_PROP_KEY;
 import static 
org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY;
+import static 
org.apache.hudi.common.table.HoodieTableMetaClient.AUXILIARYFOLDER_NAME;
 
 /**
  * A FileSystem based lock. This {@link LockProvider} implementation allows to 
lock table operations
@@ -201,4 +204,26 @@ public class FileSystemBasedLockProvider implements 
LockProvider<String>, Serial
           || config.getConfig().getString(HoodieWriteConfig.BASE_PATH.key(), 
null) != null);
     
ValidationUtils.checkArgument(config.getConfig().getInteger(FILESYSTEM_LOCK_EXPIRE_PROP_KEY)
 >= 0);
   }
+
+  /**
+   * Returns a filesystem based lock config with given table path.
+   */
+  public static TypedProperties getLockConfig(String tablePath) {
+    TypedProperties props = new TypedProperties();
+    props.put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), 
FileSystemBasedLockProvider.class.getName());
+    props.put(HoodieLockConfig.LOCK_ACQUIRE_WAIT_TIMEOUT_MS.key(), "2000");
+    props.put(HoodieLockConfig.FILESYSTEM_LOCK_EXPIRE.key(), "1");
+    props.put(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.key(), "30");
+    props.put(HoodieLockConfig.FILESYSTEM_LOCK_PATH.key(), 
defaultLockPath(tablePath));
+    return props;
+  }
+
+  /**
+   * Returns the default lock file root path.
+   *
+   * <p>IMPORTANT: this path should be shared especially when there is engine 
cooperation.
+   */
+  private static String defaultLockPath(String tablePath) {
+    return tablePath + Path.SEPARATOR + AUXILIARYFOLDER_NAME;
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
index 3e84e8493e3..6d8b0d0a7d6 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
@@ -227,12 +227,8 @@ public class FlinkWriteClients {
     if (OptionsResolver.isLockRequired(conf) && 
!conf.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key())) {
       // configure the fs lock provider by default
       builder.withLockConfig(HoodieLockConfig.newBuilder()
+          
.fromProperties(FileSystemBasedLockProvider.getLockConfig(conf.getString(FlinkOptions.PATH)))
           
.withConflictResolutionStrategy(OptionsResolver.getConflictResolutionStrategy(conf))
-          .withLockProvider(FileSystemBasedLockProvider.class)
-          .withLockWaitTimeInMillis(2000L) // 2s
-          .withFileSystemLockExpire(1) // 1 minute
-          .withClientNumRetries(30)
-          .withFileSystemLockPath(StreamerUtil.getAuxiliaryPath(conf))
           .build());
     }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 5342d7b4192..ce44ba04060 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -74,7 +74,6 @@ import static 
org.apache.hudi.common.model.HoodieFileFormat.HOODIE_LOG;
 import static org.apache.hudi.common.model.HoodieFileFormat.ORC;
 import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
 import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
-import static 
org.apache.hudi.common.table.HoodieTableMetaClient.AUXILIARYFOLDER_NAME;
 
 /**
  * Utilities for Flink stream read and write.
@@ -472,11 +471,4 @@ public class StreamerUtil {
         ? !instant.getAction().equals(HoodieTimeline.COMMIT_ACTION) // not a 
compaction
         : !ClusteringUtil.isClusteringInstant(instant, timeline);   // not a 
clustering
   }
-
-  /**
-   * Returns the auxiliary path.
-   */
-  public static String getAuxiliaryPath(Configuration conf) {
-    return conf.getString(FlinkOptions.PATH) + Path.SEPARATOR + 
AUXILIARYFOLDER_NAME;
-  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
index fe2b09ed426..2c4fcc8e315 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
@@ -21,9 +21,9 @@ package org.apache.hudi
 
 import org.apache.hudi.avro.model.HoodieClusteringGroup
 import org.apache.hudi.client.SparkRDDWriteClient
+import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.util.StringUtils
-
 import org.apache.spark.SparkException
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.sql.SparkSession
@@ -106,4 +106,11 @@ object HoodieCLIUtils {
       .map(pair => pair.get(0) -> pair.get(1))
       .toMap
   }
+
+  def getLockOptions(tablePath: String): Map[String, String] = {
+    val props = FileSystemBasedLockProvider.getLockConfig(tablePath)
+    props.stringPropertyNames.asScala
+      .map(key => key -> props.getString(key))
+      .toMap
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
index f88cdcfc8ee..4394095d9a7 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
@@ -23,8 +23,8 @@ import 
org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeli
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.util.ValidationUtils.checkArgument
 import org.apache.hudi.common.util.{ClusteringUtils, HoodieTimer, Option => 
HOption}
-import org.apache.hudi.config.HoodieClusteringConfig
 import org.apache.hudi.config.HoodieClusteringConfig.LayoutOptimizationStrategy
+import org.apache.hudi.config.{HoodieClusteringConfig, HoodieLockConfig}
 import org.apache.hudi.exception.HoodieClusteringException
 import org.apache.hudi.{AvroConversionUtils, HoodieCLIUtils, HoodieFileIndex}
 
@@ -140,6 +140,13 @@ class RunClusteringProcedure extends BaseProcedure
         logInfo("No options")
     }
 
+    if (metaClient.getTableConfig.isMetadataTableAvailable) {
+      if (!confs.contains(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key)) {
+        confs = confs ++ HoodieCLIUtils.getLockOptions(basePath)
+        logInfo("Auto config filesystem lock provider for metadata table")
+      }
+    }
+
     val pendingClusteringInstants = 
ClusteringUtils.getAllPendingClusteringPlans(metaClient)
       .iterator().asScala.map(_.getLeft.getTimestamp).toSeq.sortBy(f => f)
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala
index c91686b84a9..338262dca95 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala
@@ -22,6 +22,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, 
HoodieTimeline}
 import org.apache.hudi.common.util.{CompactionUtils, HoodieTimer, Option => 
HOption}
+import org.apache.hudi.config.HoodieLockConfig
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.{HoodieCLIUtils, SparkAdapterSupport}
 
@@ -83,6 +84,13 @@ class RunCompactionProcedure extends BaseProcedure with 
ProcedureBuilder with Sp
     val basePath = getBasePath(tableName, tablePath)
     val metaClient = 
HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
 
+    if (metaClient.getTableConfig.isMetadataTableAvailable) {
+      if (!confs.contains(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key)) {
+        confs = confs ++ HoodieCLIUtils.getLockOptions(basePath)
+        logInfo("Auto config filesystem lock provider for metadata table")
+      }
+    }
+
     val pendingCompactionInstants = 
metaClient.getActiveTimeline.getWriteTimeline.getInstants.iterator().asScala
       .filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION)
       .map(_.getTimestamp)
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
index 15273d701a8..9abeafb88fd 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
@@ -65,6 +65,10 @@ public class HoodieClusteringJob {
     // Disable async cleaning, will trigger synchronous cleaning manually.
     this.props.put(HoodieCleanConfig.ASYNC_CLEAN.key(), false);
     this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
+    if (this.metaClient.getTableConfig().isMetadataTableAvailable()) {
+      // add default lock config options if MDT is enabled.
+      UtilHelpers.addLockOptions(cfg.basePath, this.props);
+    }
   }
 
   private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, 
Config cfg) {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
index 603502affb6..e7213f93a55 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
@@ -69,6 +69,10 @@ public class HoodieCompactor {
     // Disable async cleaning, will trigger synchronous cleaning manually.
     this.props.put(HoodieCleanConfig.ASYNC_CLEAN.key(), false);
     this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
+    if (this.metaClient.getTableConfig().isMetadataTableAvailable()) {
+      // add default lock config options if MDT is enabled.
+      UtilHelpers.addLockOptions(cfg.basePath, this.props);
+    }
   }
 
   private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, 
Config cfg) {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index 35a5c9fcb47..b74a43e2430 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -23,6 +23,7 @@ import org.apache.hudi.SparkJdbcUtils;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
 import org.apache.hudi.common.config.DFSPropertiesConfiguration;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.EngineType;
@@ -41,6 +42,7 @@ import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieLockConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
@@ -578,6 +580,12 @@ public class UtilHelpers {
         .build();
   }
 
+  public static void addLockOptions(String basePath, TypedProperties props) {
+    if (!props.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key())) {
+      props.putAll(FileSystemBasedLockProvider.getLockConfig(basePath));
+    }
+  }
+
   @FunctionalInterface
   public interface CheckedSupplier<T> {
     T get() throws Throwable;
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestUtilHelpers.java 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestUtilHelpers.java
new file mode 100644
index 00000000000..c7d1235424b
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestUtilHelpers.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.config.HoodieLockConfig;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Test cases for {@link UtilHelpers}.
+ */
+public class TestUtilHelpers {
+  @Test
+  void testAddLockOptions() {
+    TypedProperties props1 = new TypedProperties();
+    UtilHelpers.addLockOptions("path1", props1);
+    assertEquals(FileSystemBasedLockProvider.class.getName(), 
props1.getString(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key()));
+
+    TypedProperties props2 = new TypedProperties();
+    props2.put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), "Dummy");
+    UtilHelpers.addLockOptions("path2", props2);
+    assertEquals(1, props2.size(), "Should not add lock options if the lock 
provider is already there.");
+  }
+}

Reply via email to