This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 59527c3590 [HUDI-4540] Cover different table types in functional tests 
of Spark structured streaming (#6317)
59527c3590 is described below

commit 59527c35903ee76e89ecc3c2bdc5806ac40691d1
Author: Y Ethan Guo <[email protected]>
AuthorDate: Fri Aug 5 17:11:35 2022 -0700

    [HUDI-4540] Cover different table types in functional tests of Spark 
structured streaming (#6317)
---
 .../hudi/functional/TestStructuredStreaming.scala  | 164 +++++++++++----------
 1 file changed, 90 insertions(+), 74 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
index 85c64f8265..8981d1447b 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
@@ -18,11 +18,13 @@
 package org.apache.hudi.functional
 
 import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hudi.common.model.FileSlice
+import org.apache.hudi.common.model.{FileSlice, HoodieTableType}
 import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.HoodieTimeline
 import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
 import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, 
HoodieTestTable}
-import org.apache.hudi.config.{HoodieClusteringConfig, HoodieStorageConfig, 
HoodieWriteConfig}
+import org.apache.hudi.common.util.CollectionUtils
+import org.apache.hudi.config.{HoodieClusteringConfig, HoodieCompactionConfig, 
HoodieStorageConfig, HoodieWriteConfig}
 import org.apache.hudi.exception.TableNotFoundException
 import org.apache.hudi.testutils.HoodieClientTestBase
 import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, 
HoodieDataSourceHelpers}
@@ -31,7 +33,9 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.streaming.{OutputMode, Trigger}
 import org.apache.spark.sql.types.StructType
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.{EnumSource, ValueSource}
 
 import scala.collection.JavaConversions._
 import scala.concurrent.ExecutionContext.Implicits.global
@@ -99,8 +103,30 @@ class TestStructuredStreaming extends HoodieClientTestBase {
     (sourcePath, destPath)
   }
 
-  @Test
-  def testStructuredStreaming(): Unit = {
+  def getOptsWithTableType(tableType: HoodieTableType): Map[String, String] = {
+    commonOpts + (DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name())
+  }
+
+  def getClusteringOpts(tableType: HoodieTableType, isInlineClustering: String,
+                        isAsyncClustering: String, clusteringNumCommit: String,
+                        fileMaxRecordNum: Int): Map[String, String] = {
+    getOptsWithTableType(tableType) + (
+      HoodieClusteringConfig.INLINE_CLUSTERING.key -> isInlineClustering,
+      HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key -> 
clusteringNumCommit,
+      DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE.key -> isAsyncClustering,
+      HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key -> 
clusteringNumCommit,
+      HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key -> 
dataGen.getEstimatedFileSizeInBytes(fileMaxRecordNum).toString
+    )
+  }
+
+  def getCompactionOpts(tableType: HoodieTableType, isAsyncCompaction: 
Boolean): Map[String, String] = {
+    getOptsWithTableType(tableType) + (
+      DataSourceWriteOptions.ASYNC_COMPACT_ENABLE.key -> 
isAsyncCompaction.toString,
+      HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key -> "1"
+    )
+  }
+
+  def structuredStreamingTestRunner(tableType: HoodieTableType, 
addCompactionConfigs: Boolean, isAsyncCompaction: Boolean): Unit = {
     val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", 
"dest")
     // First chunk of data
     val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
@@ -111,7 +137,12 @@ class TestStructuredStreaming extends HoodieClientTestBase 
{
     val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
     val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count()
 
-    val f1 = initStreamingWriteFuture(inputDF1.schema, sourcePath, destPath, 
commonOpts)
+    val hudiOptions = if (addCompactionConfigs) {
+      getCompactionOpts(tableType, isAsyncCompaction)
+    } else {
+      getOptsWithTableType(tableType)
+    }
+    val f1 = initStreamingWriteFuture(inputDF1.schema, sourcePath, destPath, 
hudiOptions)
 
     val f2 = Future {
       inputDF1.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
@@ -125,16 +156,23 @@ class TestStructuredStreaming extends 
HoodieClientTestBase {
       assert(hoodieROViewDF1.count() == 100)
 
       inputDF2.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
-      // wait for spark streaming to process second microbatch
-      waitTillAtleastNCommits(fs, destPath, currNumCommits + 1, 120, 5)
-      val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, 
destPath)
-      assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, 
"000").size())
+      // When the compaction configs are added, one more commit of the 
compaction is expected
+      val numExpectedCommits = if (addCompactionConfigs) currNumCommits + 2 
else currNumCommits + 1
+      waitTillAtleastNCommits(fs, destPath, numExpectedCommits, 120, 5)
+
+      val commitInstantTime2 = if (tableType == HoodieTableType.MERGE_ON_READ) 
{
+        // For the records that are processed by the compaction in MOR table
+        // the "_hoodie_commit_time" still reflects the latest delta commit
+        latestInstant(fs, destPath, HoodieTimeline.DELTA_COMMIT_ACTION)
+      } else {
+        HoodieDataSourceHelpers.latestCommit(fs, destPath)
+      }
+      assertEquals(numExpectedCommits, 
HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size())
       // Read RO View
       val hoodieROViewDF2 = spark.read.format("org.apache.hudi")
         .load(destPath + "/*/*/*/*")
       assertEquals(100, hoodieROViewDF2.count()) // still 100, since we only 
updated
 
-
       // Read Incremental View
       // we have 2 commits, try pulling the first commit (which is not the 
latest)
       val firstCommit = HoodieDataSourceHelpers.listCommitsSince(fs, destPath, 
"000").get(0)
@@ -163,6 +201,12 @@ class TestStructuredStreaming extends HoodieClientTestBase 
{
     Await.result(Future.sequence(Seq(f1, f2)), Duration.Inf)
   }
 
+  @ParameterizedTest
+  @EnumSource(value = classOf[HoodieTableType])
+  def testStructuredStreaming(tableType: HoodieTableType): Unit = {
+    structuredStreamingTestRunner(tableType, false, false)
+  }
+
   @throws[InterruptedException]
   private def waitTillAtleastNCommits(fs: FileSystem, tablePath: String,
                                       numCommits: Int, timeoutSecs: Int, 
sleepSecsAfterEachRun: Int) = {
@@ -178,8 +222,6 @@ class TestStructuredStreaming extends HoodieClientTestBase {
         numInstants = timeline.countInstants
         success = true
       }
-      val metaClient = 
HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath)
-      .setLoadActiveTimelineOnLoad(true).build()
     } catch {
       case te: TableNotFoundException =>
         log.info("Got table not found exception. Retrying")
@@ -193,61 +235,30 @@ class TestStructuredStreaming extends 
HoodieClientTestBase {
     numInstants
   }
 
-  def getClusteringOpts(isInlineClustering: String, isAsyncClustering: String, 
isAsyncCompaction: String,
-                        clusteringNumCommit: String, fileMaxRecordNum: 
Int):Map[String, String] = {
-    commonOpts + (HoodieClusteringConfig.INLINE_CLUSTERING.key -> 
isInlineClustering,
-      HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key -> 
clusteringNumCommit,
-      DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE.key -> isAsyncClustering,
-      DataSourceWriteOptions.ASYNC_COMPACT_ENABLE.key -> isAsyncCompaction,
-      HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key -> 
clusteringNumCommit,
-      HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key -> 
dataGen.getEstimatedFileSizeInBytes(fileMaxRecordNum).toString
-    )
-  }
-
-  @Test
-  def testStructuredStreamingWithInlineClustering(): Unit = {
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testStructuredStreamingWithClustering(isAsyncClustering: Boolean): Unit 
= {
     val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", 
"dest")
 
-    def checkClusteringResult(destPath: String):Unit = {
+    def checkClusteringResult(destPath: String): Unit = {
       // check have schedule clustering and clustering file group to one
       waitTillHasCompletedReplaceInstant(destPath, 120, 1)
       metaClient.reloadActiveTimeline()
       assertEquals(1, 
getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size)
     }
-    structuredStreamingForTestClusteringRunner(sourcePath, destPath, true, 
false, false,
-      HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, 
checkClusteringResult)
-  }
-
-  @Test
-  def testStructuredStreamingWithAsyncClustering(): Unit = {
-    val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", 
"dest")
 
-    def checkClusteringResult(destPath: String):Unit = {
-      // check have schedule clustering and clustering file group to one
-      waitTillHasCompletedReplaceInstant(destPath, 120, 1)
-      metaClient.reloadActiveTimeline()
-      assertEquals(1, 
getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size)
-    }
-    structuredStreamingForTestClusteringRunner(sourcePath, destPath, false, 
true, false,
-      HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, 
checkClusteringResult)
+    structuredStreamingForTestClusteringRunner(sourcePath, destPath, 
HoodieTableType.COPY_ON_WRITE,
+      !isAsyncClustering, isAsyncClustering, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, checkClusteringResult)
   }
 
-  @Test
-  def testStructuredStreamingWithAsyncClusteringAndCompaction(): Unit = {
-    val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", 
"dest")
-
-    def checkClusteringResult(destPath: String):Unit = {
-      // check have schedule clustering and clustering file group to one
-      waitTillHasCompletedReplaceInstant(destPath, 120, 1)
-      metaClient.reloadActiveTimeline()
-      assertEquals(1, 
getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size)
-    }
-    structuredStreamingForTestClusteringRunner(sourcePath, destPath, false, 
true, true,
-      HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, 
checkClusteringResult)
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testStructuredStreamingWithCompaction(isAsyncCompaction: Boolean): Unit 
= {
+    structuredStreamingTestRunner(HoodieTableType.MERGE_ON_READ, true, 
isAsyncCompaction)
   }
 
-  def structuredStreamingForTestClusteringRunner(sourcePath: String, destPath: 
String, isInlineClustering: Boolean,
-                                                 isAsyncClustering: Boolean, 
isAsyncCompaction: Boolean,
+  def structuredStreamingForTestClusteringRunner(sourcePath: String, destPath: 
String, tableType: HoodieTableType,
+                                                 isInlineClustering: Boolean, 
isAsyncClustering: Boolean,
                                                  partitionOfRecords: String, 
checkClusteringResult: String => Unit): Unit = {
     // First insert of data
     val records1 = recordsToStrings(dataGen.generateInsertsForPartition("000", 
100, partitionOfRecords)).toList
@@ -257,8 +268,8 @@ class TestStructuredStreaming extends HoodieClientTestBase {
     val records2 = recordsToStrings(dataGen.generateInsertsForPartition("001", 
100, partitionOfRecords)).toList
     val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
 
-    val hudiOptions = getClusteringOpts(isInlineClustering.toString, 
isAsyncClustering.toString,
-      isAsyncCompaction.toString, "2", 100)
+    val hudiOptions = getClusteringOpts(
+      tableType, isInlineClustering.toString, isAsyncClustering.toString, "2", 
100)
     val f1 = initStreamingWriteFuture(inputDF1.schema, sourcePath, destPath, 
hudiOptions)
 
     val f2 = Future {
@@ -270,28 +281,24 @@ class TestStructuredStreaming extends 
HoodieClientTestBase {
       inputDF2.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
       // wait for spark streaming to process second microbatch
       currNumCommits = waitTillAtleastNCommits(fs, destPath, currNumCommits + 
1, 120, 5)
-      // for inline clustering, clustering may be complete along with 2nd 
commit
-      if (HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, 
destPath).getCompletedReplaceTimeline.countInstants() > 0) {
-        assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, 
"000").size())
-        // check have at least one file group
-        this.metaClient = 
HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(destPath)
-          .setLoadActiveTimelineOnLoad(true).build()
-        assertTrue(getLatestFileGroupsFileId(partitionOfRecords).size > 0)
-      } else {
-        assertEquals(currNumCommits, 
HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size())
-        // check have more than one file group
-        this.metaClient = 
HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(destPath)
-          .setLoadActiveTimelineOnLoad(true).build()
-        assertTrue(getLatestFileGroupsFileId(partitionOfRecords).size > 1)
-      }
 
-      // check clustering result
+      // Wait for the clustering to finish
+      this.metaClient = 
HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(destPath)
+        .setLoadActiveTimelineOnLoad(true).build()
       checkClusteringResult(destPath)
 
-      // check data correct after clustering
+      assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, 
"000").size())
+      // Check have at least one file group
+      assertTrue(getLatestFileGroupsFileId(partitionOfRecords).size > 0)
+
+      // Validate data after clustering
       val hoodieROViewDF2 = spark.read.format("org.apache.hudi")
         .load(destPath + "/*/*/*/*")
       assertEquals(200, hoodieROViewDF2.count())
+      val countsPerCommit = 
hoodieROViewDF2.groupBy("_hoodie_commit_time").count().collect()
+      assertEquals(2, countsPerCommit.length)
+      val commitInstantTime2 = latestInstant(fs, destPath, 
HoodieTimeline.COMMIT_ACTION)
+      assertEquals(commitInstantTime2, countsPerCommit.maxBy(row => 
row.getAs[String](0)).get(0))
     }
     Await.result(Future.sequence(Seq(f1, f2)), Duration.Inf)
   }
@@ -327,4 +334,13 @@ class TestStructuredStreaming extends HoodieClientTestBase 
{
     if (!success) throw new IllegalStateException("Timed-out waiting for 
completing replace instant appear in " + tablePath)
   }
 
+  private def latestInstant(fs: FileSystem, basePath: String, instantAction: 
String): String = {
+    val metaClient = HoodieTableMetaClient.builder
+      
.setConf(fs.getConf).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build
+    metaClient.getActiveTimeline
+      .getTimelineOfActions(CollectionUtils.createSet(instantAction))
+      .filterCompletedInstants
+      .lastInstant
+      .get.getTimestamp
+  }
 }

Reply via email to