This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c87663d085b [HUDI-8023] Add multiwriter test for secondary and 
partition stats index (#11948)
c87663d085b is described below

commit c87663d085b317316aef9ef4b3ead11eba4d8566
Author: Sagar Sumit <[email protected]>
AuthorDate: Thu Sep 19 03:29:11 2024 +0530

    [HUDI-8023] Add multiwriter test for secondary and partition stats index 
(#11948)
---
 .../hudi/functional/TestPartitionStatsIndex.scala  |  60 ++++++++++-
 .../functional/TestSecondaryIndexPruning.scala     | 117 ++++++++++++++++++++-
 2 files changed, 172 insertions(+), 5 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
index 3d398d5d295..60776a86b85 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
@@ -20,14 +20,17 @@
 package org.apache.hudi.functional
 
 import org.apache.hudi.DataSourceWriteOptions.PARTITIONPATH_FIELD
-import org.apache.hudi.common.model.{FileSlice, HoodieTableType}
+import 
org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy
+import org.apache.hudi.client.transaction.lock.InProcessLockProvider
+import org.apache.hudi.common.model.{FileSlice, 
HoodieFailedWritesCleaningPolicy, HoodieTableType, WriteConcurrencyMode}
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
+import org.apache.hudi.config.{HoodieCleanConfig, HoodieLockConfig, 
HoodieWriteConfig}
+import org.apache.hudi.exception.HoodieWriteConflictException
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions
 import org.apache.hudi.metadata.HoodieMetadataFileSystemView
 import org.apache.hudi.util.JFunction
 import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, 
HoodieFileIndex}
-
 import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, 
Expression, Literal}
 import org.apache.spark.sql.types.StringType
@@ -36,7 +39,10 @@ import org.junit.jupiter.api.{Tag, Test}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.EnumSource
 
+import java.util.concurrent.Executors
 import scala.collection.JavaConverters._
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, ExecutionContext, Future}
 
 /**
  * Test cases on partition stats index with Spark datasource.
@@ -144,6 +150,56 @@ class TestPartitionStatsIndex extends 
PartitionStatsIndexTestBase {
     verifyQueryPredicate(hudiOpts)
   }
 
+  /**
+   * Test case to do a write with updates and then validate partition stats 
with multi-writer.
+   */
+  @ParameterizedTest
+  @EnumSource(classOf[HoodieTableType])
+  def testPartitionStatsWithMultiWriter(tableType: HoodieTableType): Unit = {
+    val hudiOpts = commonOpts ++ Map(
+      DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(),
+      HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key() -> 
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name,
+      HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key() -> 
HoodieFailedWritesCleaningPolicy.LAZY.name,
+      HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key() -> 
classOf[InProcessLockProvider].getName,
+      HoodieLockConfig.WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME.key() -> 
classOf[SimpleConcurrentFileWritesConflictResolutionStrategy].getName
+    )
+
+    doWriteAndValidateDataAndPartitionStats(hudiOpts,
+      operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Overwrite,
+      validate = false)
+
+    val executor = Executors.newFixedThreadPool(2)
+    implicit val executorContext: ExecutionContext = 
ExecutionContext.fromExecutor(executor)
+    val function = new Function0[Boolean] {
+      override def apply(): Boolean = {
+        try {
+          doWriteAndValidateDataAndPartitionStats(hudiOpts,
+            operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+            saveMode = SaveMode.Append,
+            validate = false)
+          true
+        } catch {
+          case _: HoodieWriteConflictException => false
+          case e => throw new Exception("Multi write failed", e)
+        }
+      }
+    }
+    val f1 = Future[Boolean] {
+      function.apply()
+    }
+    val f2 = Future[Boolean] {
+      function.apply()
+    }
+
+    Await.result(f1, Duration("5 minutes"))
+    Await.result(f2, Duration("5 minutes"))
+
+    assertTrue(f1.value.get.get || f2.value.get.get)
+    executor.shutdownNow()
+    validateDataAndPartitionStats()
+  }
+
   /**
    * Test case to do a write with updates using partitionBy and validation 
partition filters pushed down to physical plan.
    */
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
index 61a359d4959..d87e6c6cd34 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
@@ -21,11 +21,14 @@ package org.apache.hudi.functional
 
 import org.apache.hudi.DataSourceWriteOptions.{HIVE_STYLE_PARTITIONING, 
PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD}
 import org.apache.hudi.client.common.HoodieSparkEngineContext
+import 
org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy
+import org.apache.hudi.client.transaction.lock.InProcessLockProvider
 import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
-import org.apache.hudi.common.model.{FileSlice, HoodieTableType}
+import org.apache.hudi.common.model.{FileSlice, 
HoodieFailedWritesCleaningPolicy, HoodieTableType, WriteConcurrencyMode}
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.testutils.HoodieTestUtils
-import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.config.{HoodieCleanConfig, HoodieCompactionConfig, 
HoodieLockConfig, HoodieWriteConfig}
+import org.apache.hudi.exception.HoodieWriteConflictException
 import 
org.apache.hudi.functional.TestSecondaryIndexPruning.SecondaryIndexTestCase
 import org.apache.hudi.metadata.{HoodieBackedTableMetadataWriter, 
HoodieMetadataFileSystemView, SparkHoodieBackedTableMetadataWriter}
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
@@ -40,10 +43,13 @@ import org.junit.jupiter.api.Assertions.assertTrue
 import org.junit.jupiter.api.Tag
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.Arguments.arguments
-import org.junit.jupiter.params.provider.{Arguments, MethodSource}
+import org.junit.jupiter.params.provider.{Arguments, EnumSource, MethodSource}
 import org.scalatest.Assertions.assertResult
 
+import java.util.concurrent.Executors
 import scala.collection.JavaConverters
+import scala.concurrent.duration._
+import scala.concurrent.{Await, ExecutionContext, Future}
 
 /**
  * Test cases for secondary index
@@ -316,6 +322,111 @@ class TestSecondaryIndexPruning extends 
SparkClientFunctionalTestHarness {
     }
   }
 
+  /**
+   * Test case to write with updates and validate secondary index with 
multiple writers.
+   */
+  @ParameterizedTest
+  @EnumSource(classOf[HoodieTableType])
+  def testSecondaryIndexWithConcurrentWrites(tableType: HoodieTableType): Unit 
= {
+    if (HoodieSparkUtils.gteqSpark3_3) {
+      val tableName = "hudi_multi_writer_table_" + tableType.name()
+
+      // Common Hudi options
+      val hudiOpts = commonOpts ++ Map(
+        DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(),
+        HoodieWriteConfig.TBL_NAME.key -> tableName,
+        HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key() -> 
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name,
+        HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key() -> 
HoodieFailedWritesCleaningPolicy.LAZY.name,
+        HoodieCompactionConfig.INLINE_COMPACT.key() -> "false",
+        HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key() -> 
classOf[InProcessLockProvider].getName,
+        HoodieLockConfig.WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME.key() 
-> classOf[SimpleConcurrentFileWritesConflictResolutionStrategy].getName
+      )
+
+      // Create the Hudi table
+      spark.sql(
+        s"""
+           |CREATE TABLE $tableName (
+           |  ts BIGINT,
+           |  record_key_col STRING,
+           |  not_record_key_col STRING,
+           |  partition_key_col STRING
+           |) USING hudi
+           | OPTIONS (
+           |  primaryKey = 'record_key_col',
+           |  preCombineField = 'ts',
+           |  hoodie.metadata.enable = 'true',
+           |  hoodie.metadata.record.index.enable = 'true',
+           |  hoodie.datasource.write.recordkey.field = 'record_key_col',
+           |  hoodie.enable.data.skipping = 'true'
+           | )
+           | PARTITIONED BY (partition_key_col)
+           | LOCATION '$basePath'
+       """.stripMargin)
+      // Insert some data
+      spark.sql(s"insert into $tableName values(1, 'row1', 'abc', 'p1')")
+      spark.sql(s"insert into $tableName values(2, 'row2', 'cde', 'p2')")
+      // create secondary index
+      spark.sql(s"create index idx_not_record_key_col on $tableName using 
secondary_index(not_record_key_col)")
+
+      val executor = Executors.newFixedThreadPool(2)
+      implicit val executorContext: ExecutionContext = 
ExecutionContext.fromExecutor(executor)
+      val function = new Function1[Int, Boolean] {
+        override def apply(writerId: Int): Boolean = {
+          try {
+            val data = if(writerId == 1) Seq(
+              (System.currentTimeMillis(), s"row$writerId", 
s"value${writerId}_1", s"p$writerId")
+            ) else Seq(
+              (System.currentTimeMillis(), s"row$writerId", 
s"value${writerId}_2", s"p$writerId")
+            )
+
+            val df = spark.createDataFrame(data).toDF("ts", "record_key_col", 
"not_record_key_col", "partition_key_col")
+            df.write.format("hudi")
+              .options(hudiOpts)
+              .mode("append")
+              .save(basePath)
+            true
+          } catch {
+            case _: HoodieWriteConflictException => false
+            case e => throw new Exception("Multi write failed", e)
+          }
+        }
+      }
+      // Set up futures for two writers
+      val f1 = Future[Boolean] {
+        function.apply(1)
+      }(executorContext)
+      val f2 = Future[Boolean] {
+        function.apply(2)
+      }(executorContext)
+
+      Await.result(f1, Duration("5 minutes"))
+      Await.result(f2, Duration("5 minutes"))
+
+      assertTrue(f1.value.get.get || f2.value.get.get)
+      executor.shutdownNow()
+
+      // Validate the secondary index got created
+      metaClient = HoodieTableMetaClient.builder()
+        .setBasePath(basePath)
+        .setConf(HoodieTestUtils.getDefaultStorageConf)
+        .build()
+      
assert(metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_not_record_key_col"))
+
+      // Query the secondary index metadata
+      checkAnswer(
+        s"""
+           |SELECT key, SecondaryIndexMetadata.recordKey, 
SecondaryIndexMetadata.isDeleted
+           |FROM hudi_metadata('$basePath')
+           |WHERE type=7
+       """.stripMargin)(
+        Seq("abc", "row1", true),
+        Seq("cde", "row2", true),
+        Seq("value1_1", "row1", false),
+        Seq("value2_2", "row2", false)
+      )
+    }
+  }
+
   private def checkAnswer(query: String)(expects: Seq[Any]*): Unit = {
     assertResult(expects.map(row => Row(row: 
_*)).toArray.sortBy(_.toString()))(spark.sql(query).collect().sortBy(_.toString()))
   }

Reply via email to