This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 5e3ca834366 [MINOR] Fixing `TestStructuredStreaming` test (#7736)
5e3ca834366 is described below
commit 5e3ca8343668e09b37fc913981edd0761fa59359
Author: Alexey Kudinkin <[email protected]>
AuthorDate: Mon Jan 23 23:37:52 2023 -0800
[MINOR] Fixing `TestStructuredStreaming` test (#7736)
* Fixed Structured Streaming test not using LPs
* Setting timeouts for `TestStructuredStreaming` test
* Restructured `TestStructuredStreaming` tests to properly shutdown
---
.../hudi/functional/TestStructuredStreaming.scala | 75 +++++++++++++---------
1 file changed, 43 insertions(+), 32 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
index 7bfdf05b1b9..a26858dabb4 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
@@ -20,20 +20,21 @@ package org.apache.hudi.functional
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.DataSourceWriteOptions.STREAMING_CHECKPOINT_IDENTIFIER
import org.apache.hudi.HoodieSinkCheckpoint.SINK_CHECKPOINT_KEY
+import org.apache.hudi.client.transaction.lock.InProcessLockProvider
import org.apache.hudi.common.config.HoodieStorageConfig
-import org.apache.hudi.common.model.{FileSlice, HoodieTableType}
+import org.apache.hudi.common.model.{FileSlice, HoodieTableType,
WriteConcurrencyMode}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.HoodieTimeline
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.common.testutils.{HoodieTestDataGenerator,
HoodieTestTable}
import org.apache.hudi.common.util.{CollectionUtils, CommitUtils}
-import org.apache.hudi.config.{HoodieClusteringConfig, HoodieCompactionConfig,
HoodieWriteConfig}
+import org.apache.hudi.config.{HoodieClusteringConfig, HoodieCompactionConfig,
HoodieLockConfig, HoodieWriteConfig}
import org.apache.hudi.exception.TableNotFoundException
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions,
HoodieDataSourceHelpers, HoodieSinkCheckpoint}
import org.apache.log4j.LogManager
import org.apache.spark.sql._
-import org.apache.spark.sql.streaming.{OutputMode, Trigger}
+import org.apache.spark.sql.streaming.{DataStreamWriter, OutputMode,
StreamingQuery, Trigger}
import org.apache.spark.sql.types.StructType
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.{BeforeEach, Test}
@@ -50,7 +51,9 @@ import scala.concurrent.{Await, Future}
*/
class TestStructuredStreaming extends HoodieClientTestBase {
private val log = LogManager.getLogger(getClass)
- var spark: SparkSession = null
+
+ var spark: SparkSession = _
+
val commonOpts = Map(
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4",
@@ -61,34 +64,29 @@ class TestStructuredStreaming extends HoodieClientTestBase {
)
@BeforeEach override def setUp() {
- initPath()
- initSparkContexts()
+ super.setUp()
spark = sqlContext.sparkSession
- initTestDataGenerator()
- initFileSystem()
- initTimelineService()
+ // We set stop to timeout after 30s to avoid blocking things indefinitely
+ spark.conf.set("spark.sql.streaming.stopTimeout", 30000)
}
- def initStreamingWriteFuture(schema: StructType, sourcePath: String,
destPath: String, hudiOptions: Map[String, String]): Future[Unit] = {
- // define the source of streaming
+ def initWritingStreamingQuery(schema: StructType,
+ sourcePath: String,
+ destPath: String,
+ hudiOptions: Map[String, String]):
StreamingQuery = {
val streamingInput =
spark.readStream
.schema(schema)
.json(sourcePath)
- Future {
- println("streaming starting")
- //'writeStream' can be called only on streaming Dataset/DataFrame
- streamingInput
- .writeStream
- .format("org.apache.hudi")
- .options(hudiOptions)
- .trigger(Trigger.ProcessingTime(100))
- .option("checkpointLocation", basePath + "/checkpoint")
- .outputMode(OutputMode.Append)
- .start(destPath)
- .awaitTermination(10000)
- println("streaming ends")
- }
+
+ streamingInput
+ .writeStream
+ .format("org.apache.hudi")
+ .options(hudiOptions)
+ .trigger(Trigger.ProcessingTime(1000))
+ .option("checkpointLocation", basePath + "/checkpoint")
+ .outputMode(OutputMode.Append)
+ .start(destPath)
}
def initStreamingSourceAndDestPath(sourceDirName: String, destDirName:
String): (String, String) = {
@@ -138,7 +136,8 @@ class TestStructuredStreaming extends HoodieClientTestBase {
} else {
getOptsWithTableType(tableType)
}
- val f1 = initStreamingWriteFuture(inputDF1.schema, sourcePath, destPath,
hudiOptions)
+
+ val streamingQuery = initWritingStreamingQuery(inputDF1.schema,
sourcePath, destPath, hudiOptions)
val f2 = Future {
inputDF1.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
@@ -193,8 +192,11 @@ class TestStructuredStreaming extends HoodieClientTestBase
{
countsPerCommit =
hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect()
assertEquals(1, countsPerCommit.length)
assertEquals(commitInstantTime2, countsPerCommit(0).get(0))
+
+ streamingQuery.stop()
}
- Await.result(Future.sequence(Seq(f1, f2)), Duration.Inf)
+
+ Await.result(f2, Duration("120s"))
}
@ParameterizedTest
@@ -259,6 +261,11 @@ class TestStructuredStreaming extends HoodieClientTestBase
{
def testStructuredStreamingWithCheckpoint(): Unit = {
val (sourcePath, destPath) = initStreamingSourceAndDestPath("source",
"dest")
+ val opts: Map[String, String] = commonOpts ++ Map(
+ HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key ->
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name,
+ HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key ->
classOf[InProcessLockProvider].getName
+ )
+
val records1 = recordsToStrings(dataGen.generateInsertsForPartition("000",
100, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).toList
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
val schema = inputDF1.schema
@@ -270,7 +277,7 @@ class TestStructuredStreaming extends HoodieClientTestBase {
.json(sourcePath)
.writeStream
.format("org.apache.hudi")
- .options(commonOpts)
+ .options(opts)
.outputMode(OutputMode.Append)
.option(STREAMING_CHECKPOINT_IDENTIFIER.key(), "streaming_identifier1")
.option("checkpointLocation", s"$basePath/checkpoint1")
@@ -293,7 +300,7 @@ class TestStructuredStreaming extends HoodieClientTestBase {
.json(sourcePath)
.writeStream
.format("org.apache.hudi")
- .options(commonOpts)
+ .options(opts)
.outputMode(OutputMode.Append)
.option(STREAMING_CHECKPOINT_IDENTIFIER.key(), "streaming_identifier2")
.option("checkpointLocation", s"$basePath/checkpoint2")
@@ -318,7 +325,7 @@ class TestStructuredStreaming extends HoodieClientTestBase {
assertTrue(lastCheckpointCommitMetadata.isPresent)
val checkpointMap =
HoodieSinkCheckpoint.fromJson(lastCheckpointCommitMetadata.get().getMetadata(SINK_CHECKPOINT_KEY))
- assertEquals(checkpointMap.get(identifier).orNull, expectBatchId)
+ assertEquals(expectBatchId, checkpointMap.get(identifier).orNull)
}
def structuredStreamingForTestClusteringRunner(sourcePath: String, destPath:
String, tableType: HoodieTableType,
@@ -334,7 +341,8 @@ class TestStructuredStreaming extends HoodieClientTestBase {
val hudiOptions = getClusteringOpts(
tableType, isInlineClustering.toString, isAsyncClustering.toString, "2",
100)
- val f1 = initStreamingWriteFuture(inputDF1.schema, sourcePath, destPath,
hudiOptions)
+
+ val streamingQuery = initWritingStreamingQuery(inputDF1.schema,
sourcePath, destPath, hudiOptions)
val f2 = Future {
inputDF1.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
@@ -363,8 +371,11 @@ class TestStructuredStreaming extends HoodieClientTestBase
{
assertEquals(2, countsPerCommit.length)
val commitInstantTime2 = latestInstant(fs, destPath,
HoodieTimeline.COMMIT_ACTION)
assertEquals(commitInstantTime2, countsPerCommit.maxBy(row =>
row.getAs[String](0)).get(0))
+
+ streamingQuery.stop()
}
- Await.result(Future.sequence(Seq(f1, f2)), Duration.Inf)
+
+ Await.result(f2, Duration("120s"))
}
private def getLatestFileGroupsFileId(partition: String):Array[String] = {