This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a399bc5c4e7 [HUDI-6459] Add Rollback and multi-writer tests for Record
Level Index (#9105)
a399bc5c4e7 is described below
commit a399bc5c4e733f0a688b96dd9f9112ec4efbc2b6
Author: Lokesh Jain <[email protected]>
AuthorDate: Wed Aug 2 03:02:44 2023 +0530
[HUDI-6459] Add Rollback and multi-writer tests for Record Level Index
(#9105)
- Adding rollback and multi-writer tests with RLI
---
.../hudi/functional/TestRecordLevelIndex.scala | 151 ++++++++++++++++-----
1 file changed, 120 insertions(+), 31 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
index 614a412c4a5..015c4262ceb 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
@@ -23,13 +23,15 @@ import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.client.common.HoodieSparkEngineContext
+import
org.apache.hudi.client.transaction.PreferWriterConflictResolutionStrategy
import org.apache.hudi.client.utils.MetadataConversionUtils
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model._
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
-import org.apache.hudi.config.{HoodieCleanConfig, HoodieClusteringConfig,
HoodieCompactionConfig, HoodieWriteConfig}
+import org.apache.hudi.config.{HoodieCleanConfig, HoodieClusteringConfig,
HoodieCompactionConfig, HoodieLockConfig, HoodieWriteConfig}
+import org.apache.hudi.exception.HoodieWriteConflictException
import org.apache.hudi.metadata.{HoodieBackedTableMetadata,
HoodieTableMetadataUtil, MetadataPartitionType}
import org.apache.hudi.testutils.HoodieSparkClientTestBase
import org.apache.hudi.util.JavaConversions
@@ -40,11 +42,15 @@ import org.junit.jupiter.api._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{CsvSource, EnumSource}
+import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicInteger
import java.util.stream.Collectors
import java.util.{Collections, Properties}
import scala.collection.JavaConverters._
import scala.collection.{JavaConverters, mutable}
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.language.postfixOps
import scala.util.Using
@Tag("functional")
@@ -239,14 +245,15 @@ class TestRecordLevelIndex extends
HoodieSparkClientTestBase {
@ParameterizedTest
@EnumSource(classOf[HoodieTableType])
- def testRLIWithCleaning(tableType: HoodieTableType): Unit = {
+ def testRLIWithDTCleaning(tableType: HoodieTableType): Unit = {
var hudiOpts = commonOpts ++ Map(
DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(),
HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> "1")
if (tableType == HoodieTableType.MERGE_ON_READ) {
hudiOpts = hudiOpts ++ Map(
HoodieCompactionConfig.INLINE_COMPACT.key() -> "true",
- HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "1"
+ HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "2",
+ HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key() -> "15"
)
}
@@ -269,7 +276,11 @@ class TestRecordLevelIndex extends
HoodieSparkClientTestBase {
assertTrue(metaClient.getActiveTimeline.getCleanerTimeline.lastInstant().get().getTimestamp
.compareTo(lastCleanInstant.get().getTimestamp) > 0)
- rollbackLastInstant(hudiOpts)
+ var rollbackedInstant: Option[HoodieInstant] = Option.empty
+ while (rollbackedInstant.isEmpty || rollbackedInstant.get.getAction !=
ActionType.clean.name()) {
+ // rollback clean instant
+ rollbackedInstant = Option.apply(rollbackLastInstant(hudiOpts))
+ }
validateDataAndRecordIndices(hudiOpts)
}
@@ -292,18 +303,19 @@ class TestRecordLevelIndex extends
HoodieSparkClientTestBase {
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Append)
- val lastCompactionInstant = getLatestCompactionInstant()
+ var lastCompactionInstant = getLatestCompactionInstant()
assertTrue(lastCompactionInstant.isPresent)
doWriteAndValidateDataAndRecordIndex(hudiOpts,
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Append)
assertTrue(getLatestCompactionInstant().get().getTimestamp.compareTo(lastCompactionInstant.get().getTimestamp)
> 0)
+ lastCompactionInstant = getLatestCompactionInstant()
- val writeConfig = getWriteConfig(hudiOpts)
- Using(new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc),
writeConfig)) { client =>
- val lastInstant = getHoodieTable(metaClient,
writeConfig).getCompletedCommitsTimeline.lastInstant()
- client.rollback(lastInstant.get().getTimestamp)
+ var rollbackedInstant: Option[HoodieInstant] = Option.empty
+ while (rollbackedInstant.isEmpty || rollbackedInstant.get.getTimestamp !=
lastCompactionInstant.get().getTimestamp) {
+ // rollback compaction instant
+ rollbackedInstant = Option.apply(rollbackLastInstant(hudiOpts))
}
validateDataAndRecordIndices(hudiOpts)
}
@@ -335,6 +347,9 @@ class TestRecordLevelIndex extends
HoodieSparkClientTestBase {
saveMode = SaveMode.Append)
assertTrue(getLatestClusteringInstant().get().getTimestamp.compareTo(lastClusteringInstant.get().getTimestamp)
> 0)
+ assertEquals(getLatestClusteringInstant(),
metaClient.getActiveTimeline.lastInstant())
+ // We are validating rollback of a DT clustering instant here
+ rollbackLastInstant(hudiOpts)
validateDataAndRecordIndices(hudiOpts)
}
@@ -433,39 +448,109 @@ class TestRecordLevelIndex extends
HoodieSparkClientTestBase {
assertTrue(compactionBaseFile.isPresent)
}
- @Disabled("needs investigation")
+ @Disabled("Would take a long time to run on regular basis")
@ParameterizedTest
@EnumSource(classOf[HoodieTableType])
def testRLIWithMDTCleaning(tableType: HoodieTableType): Unit = {
- val hudiOpts = commonOpts ++ Map(
- DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name())
-
+ var hudiOpts = commonOpts ++ Map(
+ DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(),
+ HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key() -> "1")
doWriteAndValidateDataAndRecordIndex(hudiOpts,
operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Overwrite)
- doWriteAndValidateDataAndRecordIndex(hudiOpts,
+
+ hudiOpts = hudiOpts +
(HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key() -> "40")
+ val function = () => doWriteAndValidateDataAndRecordIndex(hudiOpts,
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Append)
+ executeFunctionNTimes(function, 20)
+
+
assertTrue(getMetadataMetaClient(hudiOpts).getActiveTimeline.getCleanerTimeline.lastInstant().isPresent)
+ rollbackLastInstant(hudiOpts)
+ // Rolling back clean instant from MDT
+ rollbackLastInstant(hudiOpts)
+ validateDataAndRecordIndices(hudiOpts)
+ }
+
+ @ParameterizedTest
+ @EnumSource(classOf[HoodieTableType])
+ def testRLIWithMultiWriter(tableType: HoodieTableType): Unit = {
+ val hudiOpts = commonOpts ++ Map(
+ DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(),
+ HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key() ->
"optimistic_concurrency_control",
+ HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key() -> "LAZY",
+ HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key() ->
"org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider",
+ HoodieLockConfig.WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME.key() ->
classOf[PreferWriterConflictResolutionStrategy].getName
+ )
+
doWriteAndValidateDataAndRecordIndex(hudiOpts,
- operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
- saveMode = SaveMode.Append)
- val metadataTableFSView = getHoodieTable(metaClient,
getWriteConfig(hudiOpts)).getMetadata
- .asInstanceOf[HoodieBackedTableMetadata].getMetadataFileSystemView
- assertTrue(
- metadataTableFSView.getTimeline
- .filter(JavaConversions.getPredicate(instant => instant.getAction ==
ActionType.clean.name()))
- .lastInstant()
- .isPresent)
+ operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Overwrite,
+ validate = false)
+
+ val executor = Executors.newFixedThreadPool(2)
+ implicit val executorContext: ExecutionContext =
ExecutionContext.fromExecutor(executor)
+ val function = new Function0[Boolean] {
+ override def apply(): Boolean = {
+ try {
+ doWriteAndValidateDataAndRecordIndex(hudiOpts,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ validate = false)
+ true
+ } catch {
+ case _: HoodieWriteConflictException => false
+ case e => throw new Exception("Multi write failed", e)
+ }
+ }
+ }
+ val f1 = Future[Boolean] {
+ function.apply()
+ }
+ val f2 = Future[Boolean] {
+ function.apply()
+ }
+
+ Await.result(f1, Duration("5 minutes"))
+ Await.result(f2, Duration("5 minutes"))
+
+ assertTrue(f1.value.get.get || f2.value.get.get)
+ executor.shutdownNow()
+ validateDataAndRecordIndices(hudiOpts)
+ }
+
+ private def getLatestMetaClient(enforce: Boolean): HoodieTableMetaClient = {
+ val lastInsant = String.format("%03d", new
Integer(instantTime.incrementAndGet()))
+ if (enforce ||
metaClient.getActiveTimeline.lastInstant().get().getTimestamp.compareTo(lastInsant)
< 0) {
+ println("Reloaded timeline")
+ metaClient.reloadActiveTimeline()
+ metaClient
+ }
+ metaClient
}
- private def rollbackLastInstant(hudiOpts: Map[String, String]): Unit = {
- if (getLatestCompactionInstant() !=
metaClient.getCommitsAndCompactionTimeline.lastInstant()) {
+ private def rollbackLastInstant(hudiOpts: Map[String, String]):
HoodieInstant = {
+ val lastInstant = getLatestMetaClient(false).getActiveTimeline
+ .filter(JavaConversions.getPredicate(instant => instant.getAction !=
ActionType.rollback.name()))
+ .lastInstant().get()
+ if (getLatestCompactionInstant() !=
getLatestMetaClient(false).getActiveTimeline.lastInstant()
+ && lastInstant.getAction != ActionType.replacecommit.name()
+ && lastInstant.getAction != ActionType.clean.name()) {
mergedDfList = mergedDfList.take(mergedDfList.size - 1)
}
val writeConfig = getWriteConfig(hudiOpts)
- Using(new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc),
writeConfig)) { client =>
- val lastInstant = getHoodieTable(metaClient,
writeConfig).getCompletedCommitsTimeline.lastInstant()
- client.rollback(lastInstant.get().getTimestamp)
+ new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), writeConfig)
+ .rollback(lastInstant.getTimestamp)
+
+ if (lastInstant.getAction != ActionType.clean.name()) {
+ assertEquals(ActionType.rollback.name(),
getLatestMetaClient(true).getActiveTimeline.lastInstant().get().getAction)
+ }
+ lastInstant
+ }
+
+ private def executeFunctionNTimes[T](function0: Function0[T], n : Int): Unit
= {
+ for (i <- 1 to n) {
+ function0.apply()
}
}
@@ -486,8 +571,12 @@ class TestRecordLevelIndex extends
HoodieSparkClientTestBase {
mergedDfList = mergedDfList.take(mergedDfList.size - 1)
}
+ private def getMetadataMetaClient(hudiOpts: Map[String, String]):
HoodieTableMetaClient = {
+ getHoodieTable(metaClient,
getWriteConfig(hudiOpts)).getMetadataTable.asInstanceOf[HoodieBackedTableMetadata].getMetadataMetaClient
+ }
+
private def getLatestCompactionInstant():
org.apache.hudi.common.util.Option[HoodieInstant] = {
- metaClient.reloadActiveTimeline()
+ getLatestMetaClient(false).getActiveTimeline
.filter(JavaConversions.getPredicate(s => Option(
try {
val commitMetadata =
MetadataConversionUtils.getHoodieCommitMetadata(metaClient, s)
@@ -502,7 +591,7 @@ class TestRecordLevelIndex extends
HoodieSparkClientTestBase {
}
private def getLatestClusteringInstant():
org.apache.hudi.common.util.Option[HoodieInstant] = {
- metaClient.getActiveTimeline.getCompletedReplaceTimeline.lastInstant()
+
getLatestMetaClient(false).getActiveTimeline.getCompletedReplaceTimeline.lastInstant()
}
private def doWriteAndValidateDataAndRecordIndex(hudiOpts: Map[String,
String],
@@ -573,7 +662,7 @@ class TestRecordLevelIndex extends
HoodieSparkClientTestBase {
}
private def getInstantTime(): String = {
- String.format("%03d", new Integer(instantTime.getAndIncrement()))
+ String.format("%03d", new Integer(instantTime.incrementAndGet()))
}
private def getWriteConfig(hudiOpts: Map[String, String]): HoodieWriteConfig
= {