This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new cbc7f81719e [HUDI-8234] Fix and enable
testPartitionStatsWithMultiWriter (#12073)
cbc7f81719e is described below
commit cbc7f81719ecc34b02cda26247e676d5638d4dc7
Author: Lin Liu <[email protected]>
AuthorDate: Fri Oct 11 14:57:13 2024 -0700
[HUDI-8234] Fix and enable testPartitionStatsWithMultiWriter (#12073)
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../testutils/HoodieSparkClientTestHarness.java | 1 -
.../common/testutils/HoodieCommonTestHarness.java | 85 +++++++++++++++++++++-
.../functional/PartitionStatsIndexTestBase.scala | 24 +++++-
.../hudi/functional/TestPartitionStatsIndex.scala | 33 +++++++--
4 files changed, 132 insertions(+), 11 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java
index 3f342f8e054..3cb89a5c68b 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java
@@ -133,7 +133,6 @@ public abstract class HoodieSparkClientTestHarness extends
HoodieWriterClientTes
protected SparkSession sparkSession;
protected SQLContext sqlContext;
protected ExecutorService executorService;
- protected HoodieTableMetaClient metaClient;
protected SparkRDDWriteClient writeClient;
protected SparkRDDReadClient readClient;
protected HoodieTableFileSystemView tableView;
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
index e2d63d69270..be25eb4bfb2 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
@@ -21,6 +21,8 @@ package org.apache.hudi.common.testutils;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
@@ -30,16 +32,27 @@ import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
/**
* The common hoodie test harness to provide the basic infrastructure.
*/
public class HoodieCommonTestHarness {
-
+ private static final Logger LOG =
LoggerFactory.getLogger(HoodieCommonTestHarness.class);
protected static final String BASE_FILE_EXTENSION =
HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension();
+ protected static ScheduledThreadPoolExecutor scheduledThreadPoolExecutor =
null;
protected String tableName;
protected String basePath;
@@ -159,4 +172,74 @@ public class HoodieCommonTestHarness {
protected HoodieTableType getTableType() {
return HoodieTableType.COPY_ON_WRITE;
}
+
+ public void pollTimelineForAction(String tablePath, StorageConfiguration<?>
conf, int numCommits, String action) throws InterruptedException {
+ pollForTimeline(tablePath, conf, numCommits, instant ->
instant.getAction().equals(action), true);
+ }
+
+ public void pollForTimeline(String tablePath, StorageConfiguration<?> conf,
int commits) throws InterruptedException {
+ pollForTimeline(tablePath, conf, commits, instant -> true, false);
+ }
+
+ private void pollForTimeline(String tablePath, StorageConfiguration<?> conf,
int commits, Predicate<HoodieInstant> filter, boolean pullAllCommits)
+ throws InterruptedException {
+ Semaphore semaphore = new Semaphore(1);
+ semaphore.acquire();
+ ScheduledFuture<?> scheduledFuture =
getScheduledExecutorService().scheduleWithFixedDelay(() -> {
+ try {
+ HoodieTableMetaClient metaClient =
+
HoodieTableMetaClient.builder().setConf(conf).setBasePath(tablePath).build();
+ HoodieTimeline timeline = pullAllCommits
+ ? metaClient.getActiveTimeline().getAllCommitsTimeline()
+ : metaClient.getActiveTimeline().getCommitsTimeline();
+ List<HoodieInstant> instants = timeline
+ .filterCompletedInstants()
+ .getInstants()
+ .stream()
+ .filter(filter::test)
+ .collect(Collectors.toList());
+ if (instants.size() >= commits) {
+ semaphore.release();
+ }
+ } catch (Exception e) {
+ LOG.warn("Error in polling for timeline", e);
+ }
+ }, 0, 1, TimeUnit.SECONDS);
+ int maxWaitInMinutes = 10;
+ boolean timelineFound = semaphore.tryAcquire(maxWaitInMinutes,
TimeUnit.MINUTES);
+ scheduledFuture.cancel(true);
+ if (!timelineFound) {
+ throw new RuntimeException(String.format(
+ "Failed to create timeline in %d minutes", maxWaitInMinutes));
+ }
+ }
+
+ protected ScheduledThreadPoolExecutor getScheduledExecutorService() {
+ if (scheduledThreadPoolExecutor == null ||
scheduledThreadPoolExecutor.isShutdown()) {
+ scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(2);
+ scheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true);
+ }
+ return scheduledThreadPoolExecutor;
+ }
+
+ protected HoodieActiveTimeline getActiveTimeline() {
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ return metaClient.getActiveTimeline();
+ }
+
+ protected Boolean hasPendingCommits() {
+ HoodieActiveTimeline timeline = getActiveTimeline();
+ HoodieTimeline completedTimeline = timeline.filterCompletedInstants();
+ Set<String> completedInstants = completedTimeline
+ .getInstants()
+ .stream()
+ .map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
+ List<String> pendingInstants = timeline
+ .getInstants()
+ .stream()
+ .map(HoodieInstant::getTimestamp)
+ .filter(t -> !completedInstants.contains(t))
+ .collect(Collectors.toList());
+ return !pendingInstants.isEmpty();
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/PartitionStatsIndexTestBase.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/PartitionStatsIndexTestBase.scala
index 2818e1c50e6..08d8c68b0d0 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/PartitionStatsIndexTestBase.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/PartitionStatsIndexTestBase.scala
@@ -30,6 +30,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.{HoodieInstant,
MetadataConversionUtils}
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.config.HoodieWriteConfig
+import
org.apache.hudi.functional.PartitionStatsIndexTestBase.{checkIfOverlapped}
import org.apache.hudi.metadata.HoodieBackedTableMetadata
import org.apache.hudi.storage.StoragePath
import org.apache.hudi.testutils.HoodieSparkClientTestBase
@@ -202,7 +203,7 @@ class PartitionStatsIndexTestBase extends
HoodieSparkClientTestBase {
/**
* @return [[DataFrame]] that should not exist as of the latest instant;
used for non-existence validation.
*/
- protected def calculateMergedDf(latestBatchDf: DataFrame, operation:
String): DataFrame = {
+ protected def calculateMergedDf(latestBatchDf: DataFrame, operation:
String): DataFrame = synchronized {
val prevDfOpt = mergedDfList.lastOption
if (prevDfOpt.isEmpty) {
mergedDfList = mergedDfList :+ latestBatchDf
@@ -276,4 +277,25 @@ class PartitionStatsIndexTestBase extends
HoodieSparkClientTestBase {
false
}
}
+
+ // Check if the last instant has overlapped with other instants.
+ def checkIfCommitsAreConcurrent(): Boolean = {
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val timeline = metaClient.getActiveTimeline.filterCompletedInstants()
+ val instants = timeline.getInstants.asScala
+ val lastInstant = instants.last
+ val instantsWithoutLastOne = instants.dropRight(1).toList
+ findConcurrentInstants(lastInstant, instantsWithoutLastOne).nonEmpty
+ }
+
+ def findConcurrentInstants(givenInstant: HoodieInstant, instants:
List[HoodieInstant]): List[HoodieInstant] = {
+ instants.filter(i => checkIfOverlapped(i, givenInstant))
+ }
+}
+
+object PartitionStatsIndexTestBase {
+ // Check if two completed instants are overlapped in time.
+ def checkIfOverlapped(a: HoodieInstant, b: HoodieInstant): Boolean = {
+ !(a.getCompletionTime.compareTo(b.getTimestamp) < 0 ||
a.getTimestamp.compareTo(b.getCompletionTime) > 0)
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
index cba98f4a834..501613b5f2e 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
@@ -19,7 +19,7 @@
package org.apache.hudi.functional
-import org.apache.hudi.DataSourceWriteOptions.PARTITIONPATH_FIELD
+import org.apache.hudi.DataSourceWriteOptions.{BULK_INSERT_OPERATION_OPT_VAL,
PARTITIONPATH_FIELD, UPSERT_OPERATION_OPT_VAL}
import
org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy
import org.apache.hudi.client.transaction.lock.InProcessLockProvider
import org.apache.hudi.common.config.HoodieMetadataConfig
@@ -37,11 +37,12 @@ import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo,
Expression, Literal}
import org.apache.spark.sql.types.StringType
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
-import org.junit.jupiter.api.{Disabled, Tag, Test}
+import org.junit.jupiter.api.{Tag, Test}
import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.EnumSource
+import org.junit.jupiter.params.provider.{Arguments, EnumSource, MethodSource}
import java.util.concurrent.Executors
+import java.util.stream.Stream
import scala.collection.JavaConverters._
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}
@@ -156,9 +157,8 @@ class TestPartitionStatsIndex extends
PartitionStatsIndexTestBase {
* Test case to do a write with updates and then validate partition stats
with multi-writer.
*/
@ParameterizedTest
- @EnumSource(classOf[HoodieTableType])
- @Disabled("HUDI-8234")
- def testPartitionStatsWithMultiWriter(tableType: HoodieTableType): Unit = {
+ @MethodSource(Array("supplyTestArguments"))
+ def testPartitionStatsWithMultiWriter(tableType: HoodieTableType, useUpsert:
Boolean): Unit = {
val hudiOpts = commonOpts ++ Map(
DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(),
HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key() ->
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name,
@@ -178,7 +178,7 @@ class TestPartitionStatsIndex extends
PartitionStatsIndexTestBase {
override def apply(): Boolean = {
try {
doWriteAndValidateDataAndPartitionStats(hudiOpts,
- operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ operation = if (useUpsert) UPSERT_OPERATION_OPT_VAL else
BULK_INSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Append,
validate = false)
true
@@ -197,9 +197,16 @@ class TestPartitionStatsIndex extends
PartitionStatsIndexTestBase {
Await.result(f1, Duration("5 minutes"))
Await.result(f2, Duration("5 minutes"))
-
assertTrue(f1.value.get.get || f2.value.get.get)
executor.shutdownNow()
+
+ if (useUpsert) {
+ pollForTimeline(basePath, storageConf, 2)
+ assertTrue(hasPendingCommits)
+ } else {
+ pollForTimeline(basePath, storageConf, 3)
+ assertTrue(checkIfCommitsAreConcurrent())
+ }
validateDataAndPartitionStats()
}
@@ -401,3 +408,13 @@ class TestPartitionStatsIndex extends
PartitionStatsIndexTestBase {
new HoodieMetadataFileSystemView(metaClient, metaClient.getActiveTimeline,
metadataWriter(getWriteConfig(opts)).getTableMetadata)
}
}
+
+object TestPartitionStatsIndex {
+ def supplyTestArguments(): Stream[Arguments] = {
+ List(
+ Arguments.of(HoodieTableType.MERGE_ON_READ, java.lang.Boolean.TRUE),
+ Arguments.of(HoodieTableType.MERGE_ON_READ, java.lang.Boolean.FALSE),
+ Arguments.of(HoodieTableType.COPY_ON_WRITE, java.lang.Boolean.TRUE),
+ Arguments.of(HoodieTableType.COPY_ON_WRITE,
java.lang.Boolean.FALSE)).asJava.stream()
+ }
+}