This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-1.1.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 7dcccd6073938f96f0ba6ebb2e68828554d75bfe Author: Lin Liu <[email protected]> AuthorDate: Sun Oct 26 20:46:19 2025 -0700 fix: Give proper error message for multi-writer scenarios without lock provider set (#14119) --------- Co-authored-by: Lokesh Jain <[email protected]> Co-authored-by: Lokesh Jain <[email protected]> Co-authored-by: sivabalan <[email protected]> --- .../apache/hudi/index/HoodieSparkIndexClient.java | 25 ++++++---------------- .../TestPartitionStatsIndexWithSql.scala | 4 ++-- .../functional/TestRecordLevelIndexWithSQL.scala | 2 ++ .../functional/TestSecondaryIndexDataTypes.scala | 5 +++++ .../functional/TestSecondaryIndexPruning.scala | 8 ++++++- .../hudi/feature/index/TestExpressionIndex.scala | 1 + .../TestHoodieBackedTableMetadataIndexLookup.scala | 1 + .../sql/hudi/feature/index/TestIndexSyntax.scala | 4 ++++ .../hudi/feature/index/TestSecondaryIndex.scala | 4 ++++ .../TestHoodieMetadataTableValidator.java | 3 +++ 10 files changed, 36 insertions(+), 21 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/HoodieSparkIndexClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/HoodieSparkIndexClient.java index ad2f909ef63c..c2c0854f1233 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/HoodieSparkIndexClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/HoodieSparkIndexClient.java @@ -21,8 +21,6 @@ package org.apache.hudi.index; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.common.HoodieSparkEngineContext; -import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider; -import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.config.HoodieIndexingConfig; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.TypedProperties; @@ -35,6 +33,7 @@ import org.apache.hudi.common.model.WriteConcurrencyMode; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; @@ -43,7 +42,6 @@ import org.apache.hudi.exception.HoodieMetadataIndexException; import org.apache.hudi.index.record.HoodieRecordIndex; import org.apache.hudi.metadata.HoodieIndexVersion; import org.apache.hudi.metadata.MetadataPartitionType; -import org.apache.hudi.storage.StorageSchemes; import org.apache.hudi.table.action.index.BaseHoodieIndexClient; import org.apache.spark.api.java.JavaSparkContext; @@ -51,7 +49,6 @@ import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -222,6 +219,12 @@ public class HoodieSparkIndexClient extends BaseHoodieIndexClient { .withEngineType(EngineType.SPARK) .withProps(configs) .build(); + // Validate if a lock provide class is set properly. + if (localWriteConfig.getWriteConcurrencyMode().supportsMultiWriter() && StringUtils.isNullOrEmpty(localWriteConfig.getLockProviderClass())) { + throw new IllegalArgumentException( + "To create index asynchronously, multi-writer configurations need to be enabled and hence 'hoodie.write.lock.provider' is expected to be set for such cases. " + + "For single writer mode, feel free to set the config value to org.apache.hudi.client.transaction.lock.InProcessLockProvider and retry index creation"); + } return new SparkRDDWriteClient(engineContextOpt.get(), localWriteConfig, Option.empty()); } catch (Exception e) { throw new HoodieException("Failed to create write client while performing index operation ", e); @@ -259,8 +262,6 @@ public class HoodieSparkIndexClient extends BaseHoodieIndexClient { Map<String, String> writeConfig = new HashMap<>(); if (metaClient.getTableConfig().isMetadataTableAvailable()) { writeConfig.put(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name()); - writeConfig.putAll(getLockOptions(metaClient.getBasePath().toString(), - metaClient.getBasePath().toUri().getScheme(), new TypedProperties())); // [HUDI-7472] Ensure write-config contains the existing MDT partition to prevent those from getting deleted metaClient.getTableConfig().getMetadataPartitions().forEach(partitionPath -> { @@ -289,16 +290,4 @@ public class HoodieSparkIndexClient extends BaseHoodieIndexClient { HoodieIndexingConfig.fromIndexDefinition(indexDefinition).getProps().forEach((key, value) -> writeConfig.put(key.toString(), value.toString()))); return writeConfig; } - - static Map<String, String> getLockOptions(String tablePath, String scheme, TypedProperties lockConfig) { - List<String> customSupportedFSs = lockConfig.getStringList(HoodieCommonConfig.HOODIE_FS_ATOMIC_CREATION_SUPPORT.key(), ",", new ArrayList<String>()); - if (scheme == null || customSupportedFSs.contains(scheme) || StorageSchemes.isAtomicCreationSupported(scheme)) { - TypedProperties props = FileSystemBasedLockProvider.getLockConfig(tablePath); - Map<String, String> toReturn = new HashMap<>(); - props.stringPropertyNames().stream().forEach(key -> toReturn.put(key, props.getString(key))); - return toReturn; - } else { - return Collections.emptyMap(); - } - } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala index 25a7fea6e205..6629b9523541 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala @@ -44,9 +44,9 @@ class TestPartitionStatsIndexWithSql extends HoodieSparkSqlTestBase { val sqlTempTable = "hudi_tbl" - @BeforeAll - def init(): Unit = { + override protected def beforeAll(): Unit = { initQueryIndexConf() + spark.sql("set hoodie.write.lock.provider = org.apache.hudi.client.transaction.lock.InProcessLockProvider") } test("Test drop partition stats index") { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexWithSQL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexWithSQL.scala index 2e4a83ff0b4d..131da859edd0 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexWithSQL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexWithSQL.scala @@ -54,6 +54,7 @@ class TestRecordLevelIndexWithSQL extends RecordLevelIndexTestBase { "hoodie.metadata.index.column.stats.enable" -> "false", HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key -> "false") + spark.sql("set hoodie.write.lock.provider = org.apache.hudi.client.transaction.lock.InProcessLockProvider") doWriteAndValidateDataAndRecordIndex(hudiOpts, operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, saveMode = SaveMode.Overwrite, @@ -65,6 +66,7 @@ class TestRecordLevelIndexWithSQL extends RecordLevelIndexTestBase { metaClient = HoodieTableMetaClient.builder().setBasePath(basePath).setConf(HoodieTestUtils.getDefaultStorageConf).build() assertEquals(isPartitioned, HoodieRecordIndex.isPartitioned(metaClient.getIndexMetadata.get().getIndex(MetadataPartitionType.RECORD_INDEX.getPartitionPath).get())) assertTrue(MetadataPartitionType.RECORD_INDEX.isMetadataPartitionAvailable(getLatestMetaClient(true))) + spark.sql("set hoodie.write.lock.provider=") spark.sql(s"drop table $sqlTempTable") } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexDataTypes.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexDataTypes.scala index 1305f0f26d94..d59038ec0d40 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexDataTypes.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexDataTypes.scala @@ -34,6 +34,11 @@ import org.junit.jupiter.api.Tag */ @Tag("functional") class TestSecondaryIndexDataTypes extends HoodieSparkSqlTestBase { + + override protected def beforeAll(): Unit = { + spark.sql("set hoodie.write.lock.provider = org.apache.hudi.client.transaction.lock.InProcessLockProvider") + } + /** * Test secondary index creation with all data types and verify query behavior. * diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala index 2bbaf74ce588..79ade67dc3fa 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala @@ -46,7 +46,7 @@ import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Expression, Literal} import org.apache.spark.sql.types.StringType -import org.junit.jupiter.api.{Tag, Test} +import org.junit.jupiter.api.{BeforeEach, Tag, Test} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{Arguments, CsvSource, EnumSource, MethodSource, ValueSource} @@ -86,6 +86,12 @@ class TestSecondaryIndexPruning extends SparkClientFunctionalTestHarness { override def conf: SparkConf = conf(getSparkSqlConf) + @BeforeEach + override def runBeforeEach(): Unit = { + super.runBeforeEach() + spark.sql("set hoodie.write.lock.provider = org.apache.hudi.client.transaction.lock.InProcessLockProvider") + } + @Test def testSecondaryIndexWithoutRecordIndex(): Unit = { tableName += "test_secondary_index_without_rli" diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestExpressionIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestExpressionIndex.scala index abdb72fac284..e7a75fbc2a6a 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestExpressionIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestExpressionIndex.scala @@ -68,6 +68,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase with SparkAdapterSuppor override protected def beforeAll(): Unit = { spark.sql("set hoodie.metadata.index.column.stats.enable=false") + spark.sql("set hoodie.write.lock.provider = org.apache.hudi.client.transaction.lock.InProcessLockProvider") spark.sparkContext.persistentRdds.foreach(rdd => rdd._2.unpersist()) initQueryIndexConf() } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestHoodieBackedTableMetadataIndexLookup.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestHoodieBackedTableMetadataIndexLookup.scala index 29cff81beebb..e04fd6cbdc6b 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestHoodieBackedTableMetadataIndexLookup.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestHoodieBackedTableMetadataIndexLookup.scala @@ -124,6 +124,7 @@ abstract class HoodieBackedTableMetadataIndexLookupTestBase extends HoodieSparkS tmpDir = Utils.createTempDir() spark.sql("set hoodie.parquet.small.file.limit=0") + spark.sql("set hoodie.write.lock.provider = org.apache.hudi.client.transaction.lock.InProcessLockProvider") // Setup shared test data setupSharedTestData() } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestIndexSyntax.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestIndexSyntax.scala index 78ba187d924f..8fb3fd8648c0 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestIndexSyntax.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestIndexSyntax.scala @@ -32,6 +32,10 @@ import org.junit.jupiter.api.Assertions.{assertFalse, assertTrue} class TestIndexSyntax extends HoodieSparkSqlTestBase { + override protected def beforeAll(): Unit = { + spark.sql("set hoodie.write.lock.provider = org.apache.hudi.client.transaction.lock.InProcessLockProvider") + } + test("Test Create/Drop/Show/Refresh Index") { withTempDir { tmp => Seq("cow", "mor").foreach { tableType => diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestSecondaryIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestSecondaryIndex.scala index 563c3bf363c9..f31d92d6c753 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestSecondaryIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestSecondaryIndex.scala @@ -62,6 +62,10 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase { DataSourceWriteOptions.RECORD_MERGE_MODE.key() -> RecordMergeMode.COMMIT_TIME_ORDERING.name() ) ++ metadataOpts + override protected def beforeAll(): Unit = { + spark.sql("set hoodie.write.lock.provider = org.apache.hudi.client.transaction.lock.InProcessLockProvider") + } + test("Test Create/Show/Drop Secondary Index with External Table") { withRDDPersistenceValidation { withTempDir { tmp => diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java index 19cd9473f128..ec781d9bb616 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java @@ -313,6 +313,7 @@ public class TestHoodieMetadataTableValidator extends HoodieSparkClientTestBase // To overwrite the table properties created during test setup storage.deleteDirectory(metaClient.getBasePath()); + sparkSession.sql("set hoodie.write.lock.provider = org.apache.hudi.client.transaction.lock.InProcessLockProvider"); sparkSession.sql( "create table tbl (" + "ts bigint, " @@ -355,6 +356,8 @@ public class TestHoodieMetadataTableValidator extends HoodieSparkClientTestBase // validate MDT partition stats validateSecondaryIndex(); + + sparkSession.sql("set hoodie.write.lock.provider="); }); }
