This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e3ca834366 [MINOR] Fixing `TestStructuredStreaming` test (#7736)
5e3ca834366 is described below

commit 5e3ca8343668e09b37fc913981edd0761fa59359
Author: Alexey Kudinkin <[email protected]>
AuthorDate: Mon Jan 23 23:37:52 2023 -0800

    [MINOR] Fixing `TestStructuredStreaming` test (#7736)
    
    * Fixed Structured Streaming test not using LPs
    
    * Setting timeouts for `TestStructuredStreaming` test
    
    * Restructured `TestStructuredStreaming` tests to properly shutdown
---
 .../hudi/functional/TestStructuredStreaming.scala  | 75 +++++++++++++---------
 1 file changed, 43 insertions(+), 32 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
index 7bfdf05b1b9..a26858dabb4 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
@@ -20,20 +20,21 @@ package org.apache.hudi.functional
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hudi.DataSourceWriteOptions.STREAMING_CHECKPOINT_IDENTIFIER
 import org.apache.hudi.HoodieSinkCheckpoint.SINK_CHECKPOINT_KEY
+import org.apache.hudi.client.transaction.lock.InProcessLockProvider
 import org.apache.hudi.common.config.HoodieStorageConfig
-import org.apache.hudi.common.model.{FileSlice, HoodieTableType}
+import org.apache.hudi.common.model.{FileSlice, HoodieTableType, 
WriteConcurrencyMode}
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.timeline.HoodieTimeline
 import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
 import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, 
HoodieTestTable}
 import org.apache.hudi.common.util.{CollectionUtils, CommitUtils}
-import org.apache.hudi.config.{HoodieClusteringConfig, HoodieCompactionConfig, 
HoodieWriteConfig}
+import org.apache.hudi.config.{HoodieClusteringConfig, HoodieCompactionConfig, 
HoodieLockConfig, HoodieWriteConfig}
 import org.apache.hudi.exception.TableNotFoundException
 import org.apache.hudi.testutils.HoodieClientTestBase
 import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, 
HoodieDataSourceHelpers, HoodieSinkCheckpoint}
 import org.apache.log4j.LogManager
 import org.apache.spark.sql._
-import org.apache.spark.sql.streaming.{OutputMode, Trigger}
+import org.apache.spark.sql.streaming.{DataStreamWriter, OutputMode, 
StreamingQuery, Trigger}
 import org.apache.spark.sql.types.StructType
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.api.{BeforeEach, Test}
@@ -50,7 +51,9 @@ import scala.concurrent.{Await, Future}
  */
 class TestStructuredStreaming extends HoodieClientTestBase {
   private val log = LogManager.getLogger(getClass)
-  var spark: SparkSession = null
+
+  var spark: SparkSession = _
+
   val commonOpts = Map(
     "hoodie.insert.shuffle.parallelism" -> "4",
     "hoodie.upsert.shuffle.parallelism" -> "4",
@@ -61,34 +64,29 @@ class TestStructuredStreaming extends HoodieClientTestBase {
   )
 
   @BeforeEach override def setUp() {
-    initPath()
-    initSparkContexts()
+    super.setUp()
     spark = sqlContext.sparkSession
-    initTestDataGenerator()
-    initFileSystem()
-    initTimelineService()
+    // We set stop to timeout after 30s to avoid blocking things indefinitely
+    spark.conf.set("spark.sql.streaming.stopTimeout", 30000)
   }
 
-  def initStreamingWriteFuture(schema: StructType, sourcePath: String, 
destPath: String, hudiOptions: Map[String, String]): Future[Unit] = {
-    // define the source of streaming
+  def initWritingStreamingQuery(schema: StructType,
+                                sourcePath: String,
+                                destPath: String,
+                                hudiOptions: Map[String, String]): 
StreamingQuery = {
     val streamingInput =
       spark.readStream
         .schema(schema)
         .json(sourcePath)
-    Future {
-      println("streaming starting")
-      //'writeStream' can be called only on streaming Dataset/DataFrame
-      streamingInput
-        .writeStream
-        .format("org.apache.hudi")
-        .options(hudiOptions)
-        .trigger(Trigger.ProcessingTime(100))
-        .option("checkpointLocation", basePath + "/checkpoint")
-        .outputMode(OutputMode.Append)
-        .start(destPath)
-        .awaitTermination(10000)
-      println("streaming ends")
-    }
+
+    streamingInput
+      .writeStream
+      .format("org.apache.hudi")
+      .options(hudiOptions)
+      .trigger(Trigger.ProcessingTime(1000))
+      .option("checkpointLocation", basePath + "/checkpoint")
+      .outputMode(OutputMode.Append)
+      .start(destPath)
   }
 
   def initStreamingSourceAndDestPath(sourceDirName: String, destDirName: 
String): (String, String) = {
@@ -138,7 +136,8 @@ class TestStructuredStreaming extends HoodieClientTestBase {
     } else {
       getOptsWithTableType(tableType)
     }
-    val f1 = initStreamingWriteFuture(inputDF1.schema, sourcePath, destPath, 
hudiOptions)
+
+    val streamingQuery = initWritingStreamingQuery(inputDF1.schema, 
sourcePath, destPath, hudiOptions)
 
     val f2 = Future {
       inputDF1.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
@@ -193,8 +192,11 @@ class TestStructuredStreaming extends HoodieClientTestBase 
{
       countsPerCommit = 
hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect()
       assertEquals(1, countsPerCommit.length)
       assertEquals(commitInstantTime2, countsPerCommit(0).get(0))
+
+      streamingQuery.stop()
     }
-    Await.result(Future.sequence(Seq(f1, f2)), Duration.Inf)
+
+    Await.result(f2, Duration("120s"))
   }
 
   @ParameterizedTest
@@ -259,6 +261,11 @@ class TestStructuredStreaming extends HoodieClientTestBase 
{
   def testStructuredStreamingWithCheckpoint(): Unit = {
     val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", 
"dest")
 
+    val opts: Map[String, String] = commonOpts ++ Map(
+      HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key -> 
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name,
+      HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key -> 
classOf[InProcessLockProvider].getName
+    )
+
     val records1 = recordsToStrings(dataGen.generateInsertsForPartition("000", 
100, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).toList
     val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
     val schema = inputDF1.schema
@@ -270,7 +277,7 @@ class TestStructuredStreaming extends HoodieClientTestBase {
       .json(sourcePath)
       .writeStream
       .format("org.apache.hudi")
-      .options(commonOpts)
+      .options(opts)
       .outputMode(OutputMode.Append)
       .option(STREAMING_CHECKPOINT_IDENTIFIER.key(), "streaming_identifier1")
       .option("checkpointLocation", s"$basePath/checkpoint1")
@@ -293,7 +300,7 @@ class TestStructuredStreaming extends HoodieClientTestBase {
       .json(sourcePath)
       .writeStream
       .format("org.apache.hudi")
-      .options(commonOpts)
+      .options(opts)
       .outputMode(OutputMode.Append)
       .option(STREAMING_CHECKPOINT_IDENTIFIER.key(), "streaming_identifier2")
       .option("checkpointLocation", s"$basePath/checkpoint2")
@@ -318,7 +325,7 @@ class TestStructuredStreaming extends HoodieClientTestBase {
     assertTrue(lastCheckpointCommitMetadata.isPresent)
     val checkpointMap = 
HoodieSinkCheckpoint.fromJson(lastCheckpointCommitMetadata.get().getMetadata(SINK_CHECKPOINT_KEY))
 
-    assertEquals(checkpointMap.get(identifier).orNull, expectBatchId)
+    assertEquals(expectBatchId, checkpointMap.get(identifier).orNull)
   }
 
   def structuredStreamingForTestClusteringRunner(sourcePath: String, destPath: 
String, tableType: HoodieTableType,
@@ -334,7 +341,8 @@ class TestStructuredStreaming extends HoodieClientTestBase {
 
     val hudiOptions = getClusteringOpts(
       tableType, isInlineClustering.toString, isAsyncClustering.toString, "2", 
100)
-    val f1 = initStreamingWriteFuture(inputDF1.schema, sourcePath, destPath, 
hudiOptions)
+
+    val streamingQuery = initWritingStreamingQuery(inputDF1.schema, 
sourcePath, destPath, hudiOptions)
 
     val f2 = Future {
       inputDF1.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
@@ -363,8 +371,11 @@ class TestStructuredStreaming extends HoodieClientTestBase 
{
       assertEquals(2, countsPerCommit.length)
       val commitInstantTime2 = latestInstant(fs, destPath, 
HoodieTimeline.COMMIT_ACTION)
       assertEquals(commitInstantTime2, countsPerCommit.maxBy(row => 
row.getAs[String](0)).get(0))
+
+      streamingQuery.stop()
     }
-    Await.result(Future.sequence(Seq(f1, f2)), Duration.Inf)
+
+    Await.result(f2, Duration("120s"))
   }
 
   private def getLatestFileGroupsFileId(partition: String):Array[String] = {

Reply via email to