This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a399bc5c4e7 [HUDI-6459] Add Rollback and multi-writer tests for Record 
Level Index (#9105)
a399bc5c4e7 is described below

commit a399bc5c4e733f0a688b96dd9f9112ec4efbc2b6
Author: Lokesh Jain <[email protected]>
AuthorDate: Wed Aug 2 03:02:44 2023 +0530

    [HUDI-6459] Add Rollback and multi-writer tests for Record Level Index 
(#9105)
    
    - Adding rollback and multi-writer tests with RLI
---
 .../hudi/functional/TestRecordLevelIndex.scala     | 151 ++++++++++++++++-----
 1 file changed, 120 insertions(+), 31 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
index 614a412c4a5..015c4262ceb 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
@@ -23,13 +23,15 @@ import org.apache.hudi.DataSourceWriteOptions
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.client.SparkRDDWriteClient
 import org.apache.hudi.client.common.HoodieSparkEngineContext
+import 
org.apache.hudi.client.transaction.PreferWriterConflictResolutionStrategy
 import org.apache.hudi.client.utils.MetadataConversionUtils
 import org.apache.hudi.common.config.HoodieMetadataConfig
 import org.apache.hudi.common.model._
 import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
-import org.apache.hudi.config.{HoodieCleanConfig, HoodieClusteringConfig, 
HoodieCompactionConfig, HoodieWriteConfig}
+import org.apache.hudi.config.{HoodieCleanConfig, HoodieClusteringConfig, 
HoodieCompactionConfig, HoodieLockConfig, HoodieWriteConfig}
+import org.apache.hudi.exception.HoodieWriteConflictException
 import org.apache.hudi.metadata.{HoodieBackedTableMetadata, 
HoodieTableMetadataUtil, MetadataPartitionType}
 import org.apache.hudi.testutils.HoodieSparkClientTestBase
 import org.apache.hudi.util.JavaConversions
@@ -40,11 +42,15 @@ import org.junit.jupiter.api._
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.{CsvSource, EnumSource}
 
+import java.util.concurrent.Executors
 import java.util.concurrent.atomic.AtomicInteger
 import java.util.stream.Collectors
 import java.util.{Collections, Properties}
 import scala.collection.JavaConverters._
 import scala.collection.{JavaConverters, mutable}
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.language.postfixOps
 import scala.util.Using
 
 @Tag("functional")
@@ -239,14 +245,15 @@ class TestRecordLevelIndex extends 
HoodieSparkClientTestBase {
 
   @ParameterizedTest
   @EnumSource(classOf[HoodieTableType])
-  def testRLIWithCleaning(tableType: HoodieTableType): Unit = {
+  def testRLIWithDTCleaning(tableType: HoodieTableType): Unit = {
     var hudiOpts = commonOpts ++ Map(
       DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(),
       HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> "1")
     if (tableType == HoodieTableType.MERGE_ON_READ) {
       hudiOpts = hudiOpts ++ Map(
         HoodieCompactionConfig.INLINE_COMPACT.key() -> "true",
-        HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "1"
+        HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "2",
+        HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key() -> "15"
       )
     }
 
@@ -269,7 +276,11 @@ class TestRecordLevelIndex extends 
HoodieSparkClientTestBase {
     
assertTrue(metaClient.getActiveTimeline.getCleanerTimeline.lastInstant().get().getTimestamp
       .compareTo(lastCleanInstant.get().getTimestamp) > 0)
 
-    rollbackLastInstant(hudiOpts)
+    var rollbackedInstant: Option[HoodieInstant] = Option.empty
+    while (rollbackedInstant.isEmpty || rollbackedInstant.get.getAction != 
ActionType.clean.name()) {
+      // rollback clean instant
+      rollbackedInstant = Option.apply(rollbackLastInstant(hudiOpts))
+    }
     validateDataAndRecordIndices(hudiOpts)
   }
 
@@ -292,18 +303,19 @@ class TestRecordLevelIndex extends 
HoodieSparkClientTestBase {
       operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
       saveMode = SaveMode.Append)
 
-    val lastCompactionInstant = getLatestCompactionInstant()
+    var lastCompactionInstant = getLatestCompactionInstant()
     assertTrue(lastCompactionInstant.isPresent)
 
     doWriteAndValidateDataAndRecordIndex(hudiOpts,
       operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
       saveMode = SaveMode.Append)
     
assertTrue(getLatestCompactionInstant().get().getTimestamp.compareTo(lastCompactionInstant.get().getTimestamp)
 > 0)
+    lastCompactionInstant = getLatestCompactionInstant()
 
-    val writeConfig = getWriteConfig(hudiOpts)
-    Using(new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), 
writeConfig)) { client =>
-      val lastInstant = getHoodieTable(metaClient, 
writeConfig).getCompletedCommitsTimeline.lastInstant()
-      client.rollback(lastInstant.get().getTimestamp)
+    var rollbackedInstant: Option[HoodieInstant] = Option.empty
+    while (rollbackedInstant.isEmpty || rollbackedInstant.get.getTimestamp != 
lastCompactionInstant.get().getTimestamp) {
+      // rollback compaction instant
+      rollbackedInstant = Option.apply(rollbackLastInstant(hudiOpts))
     }
     validateDataAndRecordIndices(hudiOpts)
   }
@@ -335,6 +347,9 @@ class TestRecordLevelIndex extends 
HoodieSparkClientTestBase {
       saveMode = SaveMode.Append)
 
     
assertTrue(getLatestClusteringInstant().get().getTimestamp.compareTo(lastClusteringInstant.get().getTimestamp)
 > 0)
+    assertEquals(getLatestClusteringInstant(), 
metaClient.getActiveTimeline.lastInstant())
+    // We are validating rollback of a DT clustering instant here
+    rollbackLastInstant(hudiOpts)
     validateDataAndRecordIndices(hudiOpts)
   }
 
@@ -433,39 +448,109 @@ class TestRecordLevelIndex extends 
HoodieSparkClientTestBase {
     assertTrue(compactionBaseFile.isPresent)
   }
 
-  @Disabled("needs investigation")
+  @Disabled("Would take a long time to run on regular basis")
   @ParameterizedTest
   @EnumSource(classOf[HoodieTableType])
   def testRLIWithMDTCleaning(tableType: HoodieTableType): Unit = {
-    val hudiOpts = commonOpts ++ Map(
-      DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name())
-
+    var hudiOpts = commonOpts ++ Map(
+      DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(),
+      HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key() -> "1")
     doWriteAndValidateDataAndRecordIndex(hudiOpts,
       operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
       saveMode = SaveMode.Overwrite)
-    doWriteAndValidateDataAndRecordIndex(hudiOpts,
+
+    hudiOpts = hudiOpts + 
(HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key() -> "40")
+    val function = () => doWriteAndValidateDataAndRecordIndex(hudiOpts,
       operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
       saveMode = SaveMode.Append)
+    executeFunctionNTimes(function, 20)
+
+    
assertTrue(getMetadataMetaClient(hudiOpts).getActiveTimeline.getCleanerTimeline.lastInstant().isPresent)
+    rollbackLastInstant(hudiOpts)
+    // Rolling back clean instant from MDT
+    rollbackLastInstant(hudiOpts)
+    validateDataAndRecordIndices(hudiOpts)
+  }
+
+  @ParameterizedTest
+  @EnumSource(classOf[HoodieTableType])
+  def testRLIWithMultiWriter(tableType: HoodieTableType): Unit = {
+    val hudiOpts = commonOpts ++ Map(
+      DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(),
+      HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key() -> 
"optimistic_concurrency_control",
+      HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key() -> "LAZY",
+      HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key() -> 
"org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider",
+      HoodieLockConfig.WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME.key() -> 
classOf[PreferWriterConflictResolutionStrategy].getName
+    )
+
     doWriteAndValidateDataAndRecordIndex(hudiOpts,
-      operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
-      saveMode = SaveMode.Append)
-    val metadataTableFSView = getHoodieTable(metaClient, 
getWriteConfig(hudiOpts)).getMetadata
-      .asInstanceOf[HoodieBackedTableMetadata].getMetadataFileSystemView
-    assertTrue(
-      metadataTableFSView.getTimeline
-        .filter(JavaConversions.getPredicate(instant => instant.getAction == 
ActionType.clean.name()))
-        .lastInstant()
-        .isPresent)
+      operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Overwrite,
+      validate = false)
+
+    val executor = Executors.newFixedThreadPool(2)
+    implicit val executorContext: ExecutionContext = 
ExecutionContext.fromExecutor(executor)
+    val function = new Function0[Boolean] {
+      override def apply(): Boolean = {
+        try {
+          doWriteAndValidateDataAndRecordIndex(hudiOpts,
+            operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+            saveMode = SaveMode.Append,
+            validate = false)
+          true
+        } catch {
+          case _: HoodieWriteConflictException => false
+          case e => throw new Exception("Multi write failed", e)
+        }
+      }
+    }
+    val f1 = Future[Boolean] {
+      function.apply()
+    }
+    val f2 = Future[Boolean] {
+      function.apply()
+    }
+
+    Await.result(f1, Duration("5 minutes"))
+    Await.result(f2, Duration("5 minutes"))
+
+    assertTrue(f1.value.get.get || f2.value.get.get)
+    executor.shutdownNow()
+    validateDataAndRecordIndices(hudiOpts)
+  }
+
+  private def getLatestMetaClient(enforce: Boolean): HoodieTableMetaClient = {
+    val lastInsant = String.format("%03d", new 
Integer(instantTime.incrementAndGet()))
+    if (enforce || 
metaClient.getActiveTimeline.lastInstant().get().getTimestamp.compareTo(lastInsant)
 < 0) {
+      println("Reloaded timeline")
+      metaClient.reloadActiveTimeline()
+      metaClient
+    }
+    metaClient
   }
 
-  private def rollbackLastInstant(hudiOpts: Map[String, String]): Unit = {
-    if (getLatestCompactionInstant() != 
metaClient.getCommitsAndCompactionTimeline.lastInstant()) {
+  private def rollbackLastInstant(hudiOpts: Map[String, String]): 
HoodieInstant = {
+    val lastInstant = getLatestMetaClient(false).getActiveTimeline
+      .filter(JavaConversions.getPredicate(instant => instant.getAction != 
ActionType.rollback.name()))
+      .lastInstant().get()
+    if (getLatestCompactionInstant() != 
getLatestMetaClient(false).getActiveTimeline.lastInstant()
+      && lastInstant.getAction != ActionType.replacecommit.name()
+      && lastInstant.getAction != ActionType.clean.name()) {
       mergedDfList = mergedDfList.take(mergedDfList.size - 1)
     }
     val writeConfig = getWriteConfig(hudiOpts)
-    Using(new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), 
writeConfig)) { client =>
-      val lastInstant = getHoodieTable(metaClient, 
writeConfig).getCompletedCommitsTimeline.lastInstant()
-      client.rollback(lastInstant.get().getTimestamp)
+    new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), writeConfig)
+      .rollback(lastInstant.getTimestamp)
+
+    if (lastInstant.getAction != ActionType.clean.name()) {
+      assertEquals(ActionType.rollback.name(), 
getLatestMetaClient(true).getActiveTimeline.lastInstant().get().getAction)
+    }
+    lastInstant
+  }
+
+  private def executeFunctionNTimes[T](function0: Function0[T], n : Int): Unit 
= {
+    for (i <- 1 to n) {
+      function0.apply()
     }
   }
 
@@ -486,8 +571,12 @@ class TestRecordLevelIndex extends 
HoodieSparkClientTestBase {
     mergedDfList = mergedDfList.take(mergedDfList.size - 1)
   }
 
+  private def getMetadataMetaClient(hudiOpts: Map[String, String]): 
HoodieTableMetaClient = {
+    getHoodieTable(metaClient, 
getWriteConfig(hudiOpts)).getMetadataTable.asInstanceOf[HoodieBackedTableMetadata].getMetadataMetaClient
+  }
+
   private def getLatestCompactionInstant(): 
org.apache.hudi.common.util.Option[HoodieInstant] = {
-    metaClient.reloadActiveTimeline()
+    getLatestMetaClient(false).getActiveTimeline
       .filter(JavaConversions.getPredicate(s => Option(
         try {
           val commitMetadata = 
MetadataConversionUtils.getHoodieCommitMetadata(metaClient, s)
@@ -502,7 +591,7 @@ class TestRecordLevelIndex extends 
HoodieSparkClientTestBase {
   }
 
   private def getLatestClusteringInstant(): 
org.apache.hudi.common.util.Option[HoodieInstant] = {
-    metaClient.getActiveTimeline.getCompletedReplaceTimeline.lastInstant()
+    
getLatestMetaClient(false).getActiveTimeline.getCompletedReplaceTimeline.lastInstant()
   }
 
   private def doWriteAndValidateDataAndRecordIndex(hudiOpts: Map[String, 
String],
@@ -573,7 +662,7 @@ class TestRecordLevelIndex extends 
HoodieSparkClientTestBase {
   }
 
   private def getInstantTime(): String = {
-    String.format("%03d", new Integer(instantTime.getAndIncrement()))
+    String.format("%03d", new Integer(instantTime.incrementAndGet()))
   }
 
   private def getWriteConfig(hudiOpts: Map[String, String]): HoodieWriteConfig 
= {

Reply via email to