This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c87663d085b [HUDI-8023] Add multiwriter test for secondary and
partition stats index (#11948)
c87663d085b is described below
commit c87663d085b317316aef9ef4b3ead11eba4d8566
Author: Sagar Sumit <[email protected]>
AuthorDate: Thu Sep 19 03:29:11 2024 +0530
[HUDI-8023] Add multiwriter test for secondary and partition stats index
(#11948)
---
.../hudi/functional/TestPartitionStatsIndex.scala | 60 ++++++++++-
.../functional/TestSecondaryIndexPruning.scala | 117 ++++++++++++++++++++-
2 files changed, 172 insertions(+), 5 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
index 3d398d5d295..60776a86b85 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
@@ -20,14 +20,17 @@
package org.apache.hudi.functional
import org.apache.hudi.DataSourceWriteOptions.PARTITIONPATH_FIELD
-import org.apache.hudi.common.model.{FileSlice, HoodieTableType}
+import
org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy
+import org.apache.hudi.client.transaction.lock.InProcessLockProvider
+import org.apache.hudi.common.model.{FileSlice,
HoodieFailedWritesCleaningPolicy, HoodieTableType, WriteConcurrencyMode}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
+import org.apache.hudi.config.{HoodieCleanConfig, HoodieLockConfig,
HoodieWriteConfig}
+import org.apache.hudi.exception.HoodieWriteConflictException
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.metadata.HoodieMetadataFileSystemView
import org.apache.hudi.util.JFunction
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions,
HoodieFileIndex}
-
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo,
Expression, Literal}
import org.apache.spark.sql.types.StringType
@@ -36,7 +39,10 @@ import org.junit.jupiter.api.{Tag, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.EnumSource
+import java.util.concurrent.Executors
import scala.collection.JavaConverters._
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, ExecutionContext, Future}
/**
* Test cases on partition stats index with Spark datasource.
@@ -144,6 +150,56 @@ class TestPartitionStatsIndex extends
PartitionStatsIndexTestBase {
verifyQueryPredicate(hudiOpts)
}
+ /**
+ * Test case to do a write with updates and then validate partition stats
with multi-writer.
+ */
+ @ParameterizedTest
+ @EnumSource(classOf[HoodieTableType])
+ def testPartitionStatsWithMultiWriter(tableType: HoodieTableType): Unit = {
+ val hudiOpts = commonOpts ++ Map(
+ DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(),
+ HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key() ->
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name,
+ HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key() ->
HoodieFailedWritesCleaningPolicy.LAZY.name,
+ HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key() ->
classOf[InProcessLockProvider].getName,
+ HoodieLockConfig.WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME.key() ->
classOf[SimpleConcurrentFileWritesConflictResolutionStrategy].getName
+ )
+
+ doWriteAndValidateDataAndPartitionStats(hudiOpts,
+ operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Overwrite,
+ validate = false)
+
+ val executor = Executors.newFixedThreadPool(2)
+ implicit val executorContext: ExecutionContext =
ExecutionContext.fromExecutor(executor)
+ val function = new Function0[Boolean] {
+ override def apply(): Boolean = {
+ try {
+ doWriteAndValidateDataAndPartitionStats(hudiOpts,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ validate = false)
+ true
+ } catch {
+ case _: HoodieWriteConflictException => false
+ case e => throw new Exception("Multi write failed", e)
+ }
+ }
+ }
+ val f1 = Future[Boolean] {
+ function.apply()
+ }
+ val f2 = Future[Boolean] {
+ function.apply()
+ }
+
+ Await.result(f1, Duration("5 minutes"))
+ Await.result(f2, Duration("5 minutes"))
+
+ assertTrue(f1.value.get.get || f2.value.get.get)
+ executor.shutdownNow()
+ validateDataAndPartitionStats()
+ }
+
/**
* Test case to do a write with updates using partitionBy and validation
partition filters pushed down to physical plan.
*/
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
index 61a359d4959..d87e6c6cd34 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
@@ -21,11 +21,14 @@ package org.apache.hudi.functional
import org.apache.hudi.DataSourceWriteOptions.{HIVE_STYLE_PARTITIONING,
PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD}
import org.apache.hudi.client.common.HoodieSparkEngineContext
+import
org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy
+import org.apache.hudi.client.transaction.lock.InProcessLockProvider
import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
-import org.apache.hudi.common.model.{FileSlice, HoodieTableType}
+import org.apache.hudi.common.model.{FileSlice,
HoodieFailedWritesCleaningPolicy, HoodieTableType, WriteConcurrencyMode}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.testutils.HoodieTestUtils
-import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.config.{HoodieCleanConfig, HoodieCompactionConfig,
HoodieLockConfig, HoodieWriteConfig}
+import org.apache.hudi.exception.HoodieWriteConflictException
import
org.apache.hudi.functional.TestSecondaryIndexPruning.SecondaryIndexTestCase
import org.apache.hudi.metadata.{HoodieBackedTableMetadataWriter,
HoodieMetadataFileSystemView, SparkHoodieBackedTableMetadataWriter}
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
@@ -40,10 +43,13 @@ import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Tag
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.Arguments.arguments
-import org.junit.jupiter.params.provider.{Arguments, MethodSource}
+import org.junit.jupiter.params.provider.{Arguments, EnumSource, MethodSource}
import org.scalatest.Assertions.assertResult
+import java.util.concurrent.Executors
import scala.collection.JavaConverters
+import scala.concurrent.duration._
+import scala.concurrent.{Await, ExecutionContext, Future}
/**
* Test cases for secondary index
@@ -316,6 +322,111 @@ class TestSecondaryIndexPruning extends
SparkClientFunctionalTestHarness {
}
}
+ /**
+ * Test case to write with updates and validate secondary index with
multiple writers.
+ */
+ @ParameterizedTest
+ @EnumSource(classOf[HoodieTableType])
+ def testSecondaryIndexWithConcurrentWrites(tableType: HoodieTableType): Unit
= {
+ if (HoodieSparkUtils.gteqSpark3_3) {
+ val tableName = "hudi_multi_writer_table_" + tableType.name()
+
+ // Common Hudi options
+ val hudiOpts = commonOpts ++ Map(
+ DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(),
+ HoodieWriteConfig.TBL_NAME.key -> tableName,
+ HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key() ->
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name,
+ HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key() ->
HoodieFailedWritesCleaningPolicy.LAZY.name,
+ HoodieCompactionConfig.INLINE_COMPACT.key() -> "false",
+ HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key() ->
classOf[InProcessLockProvider].getName,
+ HoodieLockConfig.WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME.key()
-> classOf[SimpleConcurrentFileWritesConflictResolutionStrategy].getName
+ )
+
+ // Create the Hudi table
+ spark.sql(
+ s"""
+ |CREATE TABLE $tableName (
+ | ts BIGINT,
+ | record_key_col STRING,
+ | not_record_key_col STRING,
+ | partition_key_col STRING
+ |) USING hudi
+ | OPTIONS (
+ | primaryKey = 'record_key_col',
+ | preCombineField = 'ts',
+ | hoodie.metadata.enable = 'true',
+ | hoodie.metadata.record.index.enable = 'true',
+ | hoodie.datasource.write.recordkey.field = 'record_key_col',
+ | hoodie.enable.data.skipping = 'true'
+ | )
+ | PARTITIONED BY (partition_key_col)
+ | LOCATION '$basePath'
+ """.stripMargin)
+ // Insert some data
+ spark.sql(s"insert into $tableName values(1, 'row1', 'abc', 'p1')")
+ spark.sql(s"insert into $tableName values(2, 'row2', 'cde', 'p2')")
+ // create secondary index
+ spark.sql(s"create index idx_not_record_key_col on $tableName using
secondary_index(not_record_key_col)")
+
+ val executor = Executors.newFixedThreadPool(2)
+ implicit val executorContext: ExecutionContext =
ExecutionContext.fromExecutor(executor)
+ val function = new Function1[Int, Boolean] {
+ override def apply(writerId: Int): Boolean = {
+ try {
+ val data = if(writerId == 1) Seq(
+ (System.currentTimeMillis(), s"row$writerId",
s"value${writerId}_1", s"p$writerId")
+ ) else Seq(
+ (System.currentTimeMillis(), s"row$writerId",
s"value${writerId}_2", s"p$writerId")
+ )
+
+ val df = spark.createDataFrame(data).toDF("ts", "record_key_col",
"not_record_key_col", "partition_key_col")
+ df.write.format("hudi")
+ .options(hudiOpts)
+ .mode("append")
+ .save(basePath)
+ true
+ } catch {
+ case _: HoodieWriteConflictException => false
+ case e => throw new Exception("Multi write failed", e)
+ }
+ }
+ }
+ // Set up futures for two writers
+ val f1 = Future[Boolean] {
+ function.apply(1)
+ }(executorContext)
+ val f2 = Future[Boolean] {
+ function.apply(2)
+ }(executorContext)
+
+ Await.result(f1, Duration("5 minutes"))
+ Await.result(f2, Duration("5 minutes"))
+
+ assertTrue(f1.value.get.get || f2.value.get.get)
+ executor.shutdownNow()
+
+ // Validate the secondary index got created
+ metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(HoodieTestUtils.getDefaultStorageConf)
+ .build()
+
assert(metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_not_record_key_col"))
+
+ // Query the secondary index metadata
+ checkAnswer(
+ s"""
+ |SELECT key, SecondaryIndexMetadata.recordKey,
SecondaryIndexMetadata.isDeleted
+ |FROM hudi_metadata('$basePath')
+ |WHERE type=7
+ """.stripMargin)(
+ Seq("abc", "row1", true),
+ Seq("cde", "row2", true),
+ Seq("value1_1", "row1", false),
+ Seq("value2_2", "row2", false)
+ )
+ }
+ }
+
private def checkAnswer(query: String)(expects: Seq[Any]*): Unit = {
assertResult(expects.map(row => Row(row:
_*)).toArray.sortBy(_.toString()))(spark.sql(query).collect().sortBy(_.toString()))
}