This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new cbc7f81719e [HUDI-8234] Fix and enable 
testPartitionStatsWithMultiWriter (#12073)
cbc7f81719e is described below

commit cbc7f81719ecc34b02cda26247e676d5638d4dc7
Author: Lin Liu <[email protected]>
AuthorDate: Fri Oct 11 14:57:13 2024 -0700

    [HUDI-8234] Fix and enable testPartitionStatsWithMultiWriter (#12073)
    
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../testutils/HoodieSparkClientTestHarness.java    |  1 -
 .../common/testutils/HoodieCommonTestHarness.java  | 85 +++++++++++++++++++++-
 .../functional/PartitionStatsIndexTestBase.scala   | 24 +++++-
 .../hudi/functional/TestPartitionStatsIndex.scala  | 33 +++++++--
 4 files changed, 132 insertions(+), 11 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java
index 3f342f8e054..3cb89a5c68b 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java
@@ -133,7 +133,6 @@ public abstract class HoodieSparkClientTestHarness extends 
HoodieWriterClientTes
   protected SparkSession sparkSession;
   protected SQLContext sqlContext;
   protected ExecutorService executorService;
-  protected HoodieTableMetaClient metaClient;
   protected SparkRDDWriteClient writeClient;
   protected SparkRDDReadClient readClient;
   protected HoodieTableFileSystemView tableView;
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
index e2d63d69270..be25eb4bfb2 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
@@ -21,6 +21,8 @@ package org.apache.hudi.common.testutils;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.table.view.SyncableFileSystemView;
@@ -30,16 +32,27 @@ import org.apache.hudi.storage.StorageConfiguration;
 
 import org.apache.hadoop.conf.Configuration;
 import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
 
 /**
  * The common hoodie test harness to provide the basic infrastructure.
  */
 public class HoodieCommonTestHarness {
-
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieCommonTestHarness.class);
   protected static final String BASE_FILE_EXTENSION = 
HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension();
+  protected static ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = 
null;
 
   protected String tableName;
   protected String basePath;
@@ -159,4 +172,74 @@ public class HoodieCommonTestHarness {
   protected HoodieTableType getTableType() {
     return HoodieTableType.COPY_ON_WRITE;
   }
+
+  public void pollTimelineForAction(String tablePath, StorageConfiguration<?> 
conf, int numCommits, String action) throws InterruptedException {
+    pollForTimeline(tablePath, conf, numCommits, instant -> 
instant.getAction().equals(action), true);
+  }
+
+  public void pollForTimeline(String tablePath, StorageConfiguration<?> conf, 
int commits) throws InterruptedException {
+    pollForTimeline(tablePath, conf, commits, instant -> true, false);
+  }
+
+  private void pollForTimeline(String tablePath, StorageConfiguration<?> conf, 
int commits, Predicate<HoodieInstant> filter, boolean pullAllCommits)
+      throws InterruptedException {
+    Semaphore semaphore = new Semaphore(1);
+    semaphore.acquire();
+    ScheduledFuture<?> scheduledFuture = 
getScheduledExecutorService().scheduleWithFixedDelay(() -> {
+      try {
+        HoodieTableMetaClient metaClient =
+            
HoodieTableMetaClient.builder().setConf(conf).setBasePath(tablePath).build();
+        HoodieTimeline timeline = pullAllCommits
+            ? metaClient.getActiveTimeline().getAllCommitsTimeline()
+            : metaClient.getActiveTimeline().getCommitsTimeline();
+        List<HoodieInstant> instants = timeline
+            .filterCompletedInstants()
+            .getInstants()
+            .stream()
+            .filter(filter::test)
+            .collect(Collectors.toList());
+        if (instants.size() >= commits) {
+          semaphore.release();
+        }
+      } catch (Exception e) {
+        LOG.warn("Error in polling for timeline", e);
+      }
+    }, 0, 1, TimeUnit.SECONDS);
+    int maxWaitInMinutes = 10;
+    boolean timelineFound = semaphore.tryAcquire(maxWaitInMinutes, 
TimeUnit.MINUTES);
+    scheduledFuture.cancel(true);
+    if (!timelineFound) {
+      throw new RuntimeException(String.format(
+          "Failed to create timeline in %d minutes", maxWaitInMinutes));
+    }
+  }
+
+  protected ScheduledThreadPoolExecutor getScheduledExecutorService() {
+    if (scheduledThreadPoolExecutor == null || 
scheduledThreadPoolExecutor.isShutdown()) {
+      scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(2);
+      scheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true);
+    }
+    return scheduledThreadPoolExecutor;
+  }
+
+  protected HoodieActiveTimeline getActiveTimeline() {
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    return metaClient.getActiveTimeline();
+  }
+
+  protected Boolean hasPendingCommits() {
+    HoodieActiveTimeline timeline = getActiveTimeline();
+    HoodieTimeline completedTimeline = timeline.filterCompletedInstants();
+    Set<String> completedInstants = completedTimeline
+        .getInstants()
+        .stream()
+        .map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
+    List<String> pendingInstants = timeline
+        .getInstants()
+        .stream()
+        .map(HoodieInstant::getTimestamp)
+        .filter(t -> !completedInstants.contains(t))
+        .collect(Collectors.toList());
+    return !pendingInstants.isEmpty();
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/PartitionStatsIndexTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/PartitionStatsIndexTestBase.scala
index 2818e1c50e6..08d8c68b0d0 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/PartitionStatsIndexTestBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/PartitionStatsIndexTestBase.scala
@@ -30,6 +30,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.timeline.{HoodieInstant, 
MetadataConversionUtils}
 import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
 import org.apache.hudi.config.HoodieWriteConfig
+import 
org.apache.hudi.functional.PartitionStatsIndexTestBase.{checkIfOverlapped}
 import org.apache.hudi.metadata.HoodieBackedTableMetadata
 import org.apache.hudi.storage.StoragePath
 import org.apache.hudi.testutils.HoodieSparkClientTestBase
@@ -202,7 +203,7 @@ class PartitionStatsIndexTestBase extends 
HoodieSparkClientTestBase {
   /**
    * @return [[DataFrame]] that should not exist as of the latest instant; 
used for non-existence validation.
    */
-  protected def calculateMergedDf(latestBatchDf: DataFrame, operation: 
String): DataFrame = {
+  protected def calculateMergedDf(latestBatchDf: DataFrame, operation: 
String): DataFrame = synchronized {
     val prevDfOpt = mergedDfList.lastOption
     if (prevDfOpt.isEmpty) {
       mergedDfList = mergedDfList :+ latestBatchDf
@@ -276,4 +277,25 @@ class PartitionStatsIndexTestBase extends 
HoodieSparkClientTestBase {
         false
     }
   }
+
+  // Check if the last instant has overlapped with other instants.
+  def checkIfCommitsAreConcurrent(): Boolean = {
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    val timeline = metaClient.getActiveTimeline.filterCompletedInstants()
+    val instants = timeline.getInstants.asScala
+    val lastInstant = instants.last
+    val instantsWithoutLastOne = instants.dropRight(1).toList
+    findConcurrentInstants(lastInstant, instantsWithoutLastOne).nonEmpty
+  }
+
+  def findConcurrentInstants(givenInstant: HoodieInstant, instants: 
List[HoodieInstant]): List[HoodieInstant] = {
+    instants.filter(i => checkIfOverlapped(i, givenInstant))
+  }
+}
+
+object PartitionStatsIndexTestBase {
+  // Check if two completed instants are overlapped in time.
+  def checkIfOverlapped(a: HoodieInstant, b: HoodieInstant): Boolean = {
+    !(a.getCompletionTime.compareTo(b.getTimestamp) < 0 || 
a.getTimestamp.compareTo(b.getCompletionTime) > 0)
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
index cba98f4a834..501613b5f2e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
@@ -19,7 +19,7 @@
 
 package org.apache.hudi.functional
 
-import org.apache.hudi.DataSourceWriteOptions.PARTITIONPATH_FIELD
+import org.apache.hudi.DataSourceWriteOptions.{BULK_INSERT_OPERATION_OPT_VAL, 
PARTITIONPATH_FIELD, UPSERT_OPERATION_OPT_VAL}
 import 
org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy
 import org.apache.hudi.client.transaction.lock.InProcessLockProvider
 import org.apache.hudi.common.config.HoodieMetadataConfig
@@ -37,11 +37,12 @@ import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, 
Expression, Literal}
 import org.apache.spark.sql.types.StringType
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
-import org.junit.jupiter.api.{Disabled, Tag, Test}
+import org.junit.jupiter.api.{Tag, Test}
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.EnumSource
+import org.junit.jupiter.params.provider.{Arguments, EnumSource, MethodSource}
 
 import java.util.concurrent.Executors
+import java.util.stream.Stream
 import scala.collection.JavaConverters._
 import scala.concurrent.duration.Duration
 import scala.concurrent.{Await, ExecutionContext, Future}
@@ -156,9 +157,8 @@ class TestPartitionStatsIndex extends 
PartitionStatsIndexTestBase {
    * Test case to do a write with updates and then validate partition stats 
with multi-writer.
    */
   @ParameterizedTest
-  @EnumSource(classOf[HoodieTableType])
-  @Disabled("HUDI-8234")
-  def testPartitionStatsWithMultiWriter(tableType: HoodieTableType): Unit = {
+  @MethodSource(Array("supplyTestArguments"))
+  def testPartitionStatsWithMultiWriter(tableType: HoodieTableType, useUpsert: 
Boolean): Unit = {
     val hudiOpts = commonOpts ++ Map(
       DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(),
       HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key() -> 
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name,
@@ -178,7 +178,7 @@ class TestPartitionStatsIndex extends 
PartitionStatsIndexTestBase {
       override def apply(): Boolean = {
         try {
           doWriteAndValidateDataAndPartitionStats(hudiOpts,
-            operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+            operation = if (useUpsert) UPSERT_OPERATION_OPT_VAL else 
BULK_INSERT_OPERATION_OPT_VAL,
             saveMode = SaveMode.Append,
             validate = false)
           true
@@ -197,9 +197,16 @@ class TestPartitionStatsIndex extends 
PartitionStatsIndexTestBase {
 
     Await.result(f1, Duration("5 minutes"))
     Await.result(f2, Duration("5 minutes"))
-
     assertTrue(f1.value.get.get || f2.value.get.get)
     executor.shutdownNow()
+
+    if (useUpsert) {
+      pollForTimeline(basePath, storageConf, 2)
+      assertTrue(hasPendingCommits)
+    } else {
+      pollForTimeline(basePath, storageConf, 3)
+      assertTrue(checkIfCommitsAreConcurrent())
+    }
     validateDataAndPartitionStats()
   }
 
@@ -401,3 +408,13 @@ class TestPartitionStatsIndex extends 
PartitionStatsIndexTestBase {
     new HoodieMetadataFileSystemView(metaClient, metaClient.getActiveTimeline, 
metadataWriter(getWriteConfig(opts)).getTableMetadata)
   }
 }
+
+object TestPartitionStatsIndex {
+  def supplyTestArguments(): Stream[Arguments] = {
+    List(
+      Arguments.of(HoodieTableType.MERGE_ON_READ, java.lang.Boolean.TRUE),
+      Arguments.of(HoodieTableType.MERGE_ON_READ, java.lang.Boolean.FALSE),
+      Arguments.of(HoodieTableType.COPY_ON_WRITE, java.lang.Boolean.TRUE),
+      Arguments.of(HoodieTableType.COPY_ON_WRITE, 
java.lang.Boolean.FALSE)).asJava.stream()
+  }
+}

Reply via email to