This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-1.1.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 7dcccd6073938f96f0ba6ebb2e68828554d75bfe
Author: Lin Liu <[email protected]>
AuthorDate: Sun Oct 26 20:46:19 2025 -0700

    fix: Give proper error message for multi-writer scenarios without lock 
provider set (#14119)
    
    
    ---------
    
    Co-authored-by: Lokesh Jain <[email protected]>
    Co-authored-by: Lokesh Jain <[email protected]>
    Co-authored-by: sivabalan <[email protected]>
---
 .../apache/hudi/index/HoodieSparkIndexClient.java  | 25 ++++++----------------
 .../TestPartitionStatsIndexWithSql.scala           |  4 ++--
 .../functional/TestRecordLevelIndexWithSQL.scala   |  2 ++
 .../functional/TestSecondaryIndexDataTypes.scala   |  5 +++++
 .../functional/TestSecondaryIndexPruning.scala     |  8 ++++++-
 .../hudi/feature/index/TestExpressionIndex.scala   |  1 +
 .../TestHoodieBackedTableMetadataIndexLookup.scala |  1 +
 .../sql/hudi/feature/index/TestIndexSyntax.scala   |  4 ++++
 .../hudi/feature/index/TestSecondaryIndex.scala    |  4 ++++
 .../TestHoodieMetadataTableValidator.java          |  3 +++
 10 files changed, 36 insertions(+), 21 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/HoodieSparkIndexClient.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/HoodieSparkIndexClient.java
index ad2f909ef63c..c2c0854f1233 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/HoodieSparkIndexClient.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/HoodieSparkIndexClient.java
@@ -21,8 +21,6 @@ package org.apache.hudi.index;
 
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
-import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.config.HoodieIndexingConfig;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.TypedProperties;
@@ -35,6 +33,7 @@ import org.apache.hudi.common.model.WriteConcurrencyMode;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
@@ -43,7 +42,6 @@ import org.apache.hudi.exception.HoodieMetadataIndexException;
 import org.apache.hudi.index.record.HoodieRecordIndex;
 import org.apache.hudi.metadata.HoodieIndexVersion;
 import org.apache.hudi.metadata.MetadataPartitionType;
-import org.apache.hudi.storage.StorageSchemes;
 import org.apache.hudi.table.action.index.BaseHoodieIndexClient;
 
 import org.apache.spark.api.java.JavaSparkContext;
@@ -51,7 +49,6 @@ import org.apache.spark.sql.SparkSession;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -222,6 +219,12 @@ public class HoodieSparkIndexClient extends 
BaseHoodieIndexClient {
           .withEngineType(EngineType.SPARK)
           .withProps(configs)
           .build();
+      // Validate if a lock provide class is set properly.
+      if (localWriteConfig.getWriteConcurrencyMode().supportsMultiWriter() && 
StringUtils.isNullOrEmpty(localWriteConfig.getLockProviderClass())) {
+        throw new IllegalArgumentException(
+            "To create index asynchronously, multi-writer configurations need 
to be enabled and hence 'hoodie.write.lock.provider' is expected to be set for 
such cases. "
+                + "For single writer mode, feel free to set the config value 
to org.apache.hudi.client.transaction.lock.InProcessLockProvider and retry 
index creation");
+      }
       return new SparkRDDWriteClient(engineContextOpt.get(), localWriteConfig, 
Option.empty());
     } catch (Exception e) {
       throw new HoodieException("Failed to create write client while 
performing index operation ", e);
@@ -259,8 +262,6 @@ public class HoodieSparkIndexClient extends 
BaseHoodieIndexClient {
     Map<String, String> writeConfig = new HashMap<>();
     if (metaClient.getTableConfig().isMetadataTableAvailable()) {
       writeConfig.put(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), 
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name());
-      writeConfig.putAll(getLockOptions(metaClient.getBasePath().toString(),
-          metaClient.getBasePath().toUri().getScheme(), new 
TypedProperties()));
 
       // [HUDI-7472] Ensure write-config contains the existing MDT partition 
to prevent those from getting deleted
       
metaClient.getTableConfig().getMetadataPartitions().forEach(partitionPath -> {
@@ -289,16 +290,4 @@ public class HoodieSparkIndexClient extends 
BaseHoodieIndexClient {
         
HoodieIndexingConfig.fromIndexDefinition(indexDefinition).getProps().forEach((key,
 value) -> writeConfig.put(key.toString(), value.toString())));
     return writeConfig;
   }
-
-  static Map<String, String> getLockOptions(String tablePath, String scheme, 
TypedProperties lockConfig) {
-    List<String> customSupportedFSs = 
lockConfig.getStringList(HoodieCommonConfig.HOODIE_FS_ATOMIC_CREATION_SUPPORT.key(),
 ",", new ArrayList<String>());
-    if (scheme == null || customSupportedFSs.contains(scheme) || 
StorageSchemes.isAtomicCreationSupported(scheme)) {
-      TypedProperties props = 
FileSystemBasedLockProvider.getLockConfig(tablePath);
-      Map<String, String> toReturn = new HashMap<>();
-      props.stringPropertyNames().stream().forEach(key -> toReturn.put(key, 
props.getString(key)));
-      return toReturn;
-    } else {
-      return Collections.emptyMap();
-    }
-  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
index 25a7fea6e205..6629b9523541 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
@@ -44,9 +44,9 @@ class TestPartitionStatsIndexWithSql extends 
HoodieSparkSqlTestBase {
 
   val sqlTempTable = "hudi_tbl"
 
-  @BeforeAll
-  def init(): Unit = {
+  override protected def beforeAll(): Unit = {
     initQueryIndexConf()
+    spark.sql("set hoodie.write.lock.provider = 
org.apache.hudi.client.transaction.lock.InProcessLockProvider")
   }
 
   test("Test drop partition stats index") {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexWithSQL.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexWithSQL.scala
index 2e4a83ff0b4d..131da859edd0 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexWithSQL.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexWithSQL.scala
@@ -54,6 +54,7 @@ class TestRecordLevelIndexWithSQL extends 
RecordLevelIndexTestBase {
       "hoodie.metadata.index.column.stats.enable" -> "false",
       HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key -> "false")
 
+    spark.sql("set hoodie.write.lock.provider = 
org.apache.hudi.client.transaction.lock.InProcessLockProvider")
     doWriteAndValidateDataAndRecordIndex(hudiOpts,
       operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
       saveMode = SaveMode.Overwrite,
@@ -65,6 +66,7 @@ class TestRecordLevelIndexWithSQL extends 
RecordLevelIndexTestBase {
     metaClient = 
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(HoodieTestUtils.getDefaultStorageConf).build()
     assertEquals(isPartitioned, 
HoodieRecordIndex.isPartitioned(metaClient.getIndexMetadata.get().getIndex(MetadataPartitionType.RECORD_INDEX.getPartitionPath).get()))
     
assertTrue(MetadataPartitionType.RECORD_INDEX.isMetadataPartitionAvailable(getLatestMetaClient(true)))
+    spark.sql("set hoodie.write.lock.provider=")
     spark.sql(s"drop table $sqlTempTable")
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexDataTypes.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexDataTypes.scala
index 1305f0f26d94..d59038ec0d40 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexDataTypes.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexDataTypes.scala
@@ -34,6 +34,11 @@ import org.junit.jupiter.api.Tag
  */
 @Tag("functional")
 class TestSecondaryIndexDataTypes extends HoodieSparkSqlTestBase {
+
+  override protected def beforeAll(): Unit = {
+    spark.sql("set hoodie.write.lock.provider = 
org.apache.hudi.client.transaction.lock.InProcessLockProvider")
+  }
+
   /**
    * Test secondary index creation with all data types and verify query 
behavior.
    *
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
index 2bbaf74ce588..79ade67dc3fa 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
@@ -46,7 +46,7 @@ import org.apache.spark.SparkConf
 import org.apache.spark.sql.{DataFrame, Row}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, 
Expression, Literal}
 import org.apache.spark.sql.types.StringType
-import org.junit.jupiter.api.{Tag, Test}
+import org.junit.jupiter.api.{BeforeEach, Tag, Test}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.{Arguments, CsvSource, EnumSource, 
MethodSource, ValueSource}
@@ -86,6 +86,12 @@ class TestSecondaryIndexPruning extends 
SparkClientFunctionalTestHarness {
 
   override def conf: SparkConf = conf(getSparkSqlConf)
 
+  @BeforeEach
+  override def runBeforeEach(): Unit = {
+    super.runBeforeEach()
+    spark.sql("set hoodie.write.lock.provider = 
org.apache.hudi.client.transaction.lock.InProcessLockProvider")
+  }
+
   @Test
   def testSecondaryIndexWithoutRecordIndex(): Unit = {
     tableName += "test_secondary_index_without_rli"
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestExpressionIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestExpressionIndex.scala
index abdb72fac284..e7a75fbc2a6a 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestExpressionIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestExpressionIndex.scala
@@ -68,6 +68,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase with 
SparkAdapterSuppor
 
   override protected def beforeAll(): Unit = {
     spark.sql("set hoodie.metadata.index.column.stats.enable=false")
+    spark.sql("set hoodie.write.lock.provider = 
org.apache.hudi.client.transaction.lock.InProcessLockProvider")
     spark.sparkContext.persistentRdds.foreach(rdd => rdd._2.unpersist())
     initQueryIndexConf()
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestHoodieBackedTableMetadataIndexLookup.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestHoodieBackedTableMetadataIndexLookup.scala
index 29cff81beebb..e04fd6cbdc6b 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestHoodieBackedTableMetadataIndexLookup.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestHoodieBackedTableMetadataIndexLookup.scala
@@ -124,6 +124,7 @@ abstract class HoodieBackedTableMetadataIndexLookupTestBase 
extends HoodieSparkS
     tmpDir = Utils.createTempDir()
 
     spark.sql("set hoodie.parquet.small.file.limit=0")
+    spark.sql("set hoodie.write.lock.provider = 
org.apache.hudi.client.transaction.lock.InProcessLockProvider")
     // Setup shared test data
     setupSharedTestData()
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestIndexSyntax.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestIndexSyntax.scala
index 78ba187d924f..8fb3fd8648c0 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestIndexSyntax.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestIndexSyntax.scala
@@ -32,6 +32,10 @@ import org.junit.jupiter.api.Assertions.{assertFalse, 
assertTrue}
 
 class TestIndexSyntax extends HoodieSparkSqlTestBase {
 
+  override protected def beforeAll(): Unit = {
+    spark.sql("set hoodie.write.lock.provider = 
org.apache.hudi.client.transaction.lock.InProcessLockProvider")
+  }
+
   test("Test Create/Drop/Show/Refresh Index") {
     withTempDir { tmp =>
       Seq("cow", "mor").foreach { tableType =>
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestSecondaryIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestSecondaryIndex.scala
index 563c3bf363c9..f31d92d6c753 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestSecondaryIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestSecondaryIndex.scala
@@ -62,6 +62,10 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
     DataSourceWriteOptions.RECORD_MERGE_MODE.key() -> 
RecordMergeMode.COMMIT_TIME_ORDERING.name()
   ) ++ metadataOpts
 
+  override protected def beforeAll(): Unit = {
+    spark.sql("set hoodie.write.lock.provider = 
org.apache.hudi.client.transaction.lock.InProcessLockProvider")
+  }
+
   test("Test Create/Show/Drop Secondary Index with External Table") {
     withRDDPersistenceValidation {
       withTempDir { tmp =>
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
index 19cd9473f128..ec781d9bb616 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
@@ -313,6 +313,7 @@ public class TestHoodieMetadataTableValidator extends 
HoodieSparkClientTestBase
       // To overwrite the table properties created during test setup
       storage.deleteDirectory(metaClient.getBasePath());
 
+      sparkSession.sql("set hoodie.write.lock.provider = 
org.apache.hudi.client.transaction.lock.InProcessLockProvider");
       sparkSession.sql(
           "create table tbl ("
               + "ts bigint, "
@@ -355,6 +356,8 @@ public class TestHoodieMetadataTableValidator extends 
HoodieSparkClientTestBase
 
       // validate MDT partition stats
       validateSecondaryIndex();
+
+      sparkSession.sql("set hoodie.write.lock.provider=");
     });
   }
 

Reply via email to