This is an automated email from the ASF dual-hosted git repository.
nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9e6889a [HUDI-1481] add structured streaming and delta streamer
clustering unit test (#2360)
9e6889a is described below
commit 9e6889a8ce522ec455a11d6f6cf4949255c91a0f
Author: lw0090 <[email protected]>
AuthorDate: Mon Dec 28 12:27:09 2020 +0800
[HUDI-1481] add structured streaming and delta streamer clustering unit
test (#2360)
---
.../apache/hudi/functional/TestCOWDataSource.scala | 2 +-
.../hudi/functional/TestStructuredStreaming.scala | 165 ++++++++++++++++++---
.../functional/TestHoodieDeltaStreamer.java | 51 +++++--
3 files changed, 183 insertions(+), 35 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 0386f20..51ca72e 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -115,7 +115,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
// Upsert Operation without Hudi metadata columns
val records2 = recordsToStrings(dataGen.generateUpdates("001", 100)).toList
- val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
+ val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2 ,
2))
val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count()
inputDF2.write.format("org.apache.hudi")
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
index 7a902c1..b07f00f 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
@@ -18,16 +18,20 @@
package org.apache.hudi.functional
import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hudi.common.model.FileSlice
import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.testutils.{HoodieTestDataGenerator,
HoodieTestTable}
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
-import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.config.{HoodieClusteringConfig, HoodieStorageConfig,
HoodieWriteConfig}
import org.apache.hudi.exception.TableNotFoundException
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions,
HoodieDataSourceHelpers}
import org.apache.log4j.LogManager
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.apache.spark.sql.types.StructType
+import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows,
assertTrue}
+import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import scala.collection.JavaConversions._
@@ -64,35 +68,19 @@ class TestStructuredStreaming extends HoodieClientTestBase {
cleanupFileSystem()
}
- @Test
- def testStructuredStreaming(): Unit = {
- fs.delete(new Path(basePath), true)
- val sourcePath = basePath + "/source"
- val destPath = basePath + "/dest"
- fs.mkdirs(new Path(sourcePath))
-
- // First chunk of data
- val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
- val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
-
- // Second chunk of data
- val records2 = recordsToStrings(dataGen.generateUpdates("001", 100)).toList
- val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
- val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count()
-
+ def initStreamingWriteFuture(schema: StructType, sourcePath: String,
destPath: String, hudiOptions: Map[String, String]): Future[Unit] = {
// define the source of streaming
val streamingInput =
spark.readStream
- .schema(inputDF1.schema)
+ .schema(schema)
.json(sourcePath)
-
- val f1 = Future {
+ Future {
println("streaming starting")
//'writeStream' can be called only on streaming Dataset/DataFrame
streamingInput
.writeStream
.format("org.apache.hudi")
- .options(commonOpts)
+ .options(hudiOptions)
.trigger(Trigger.ProcessingTime(100))
.option("checkpointLocation", basePath + "/checkpoint")
.outputMode(OutputMode.Append)
@@ -100,6 +88,29 @@ class TestStructuredStreaming extends HoodieClientTestBase {
.awaitTermination(10000)
println("streaming ends")
}
+ }
+
+ def initStreamingSourceAndDestPath(sourceDirName: String, destDirName:
String): (String, String) = {
+ fs.delete(new Path(basePath), true)
+ val sourcePath = basePath + "/" + sourceDirName
+ val destPath = basePath + "/" + destDirName
+ fs.mkdirs(new Path(sourcePath))
+ (sourcePath, destPath)
+ }
+
+ @Test
+ def testStructuredStreaming(): Unit = {
+ val (sourcePath, destPath) = initStreamingSourceAndDestPath("source",
"dest")
+ // First chunk of data
+ val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+
+ // Second chunk of data
+ val records2 = recordsToStrings(dataGen.generateUpdates("001", 100)).toList
+ val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
+ val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count()
+
+ val f1 = initStreamingWriteFuture(inputDF1.schema, sourcePath, destPath,
commonOpts)
val f2 = Future {
inputDF1.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
@@ -113,7 +124,7 @@ class TestStructuredStreaming extends HoodieClientTestBase {
assert(hoodieROViewDF1.count() == 100)
inputDF2.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
- // wait for spark streaming to process one microbatch
+ // wait for spark streaming to process second microbatch
waitTillAtleastNCommits(fs, destPath, currNumCommits + 1, 120, 5)
val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs,
destPath)
assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, destPath,
"000").size())
@@ -177,4 +188,112 @@ class TestStructuredStreaming extends
HoodieClientTestBase {
if (!success) throw new IllegalStateException("Timed-out waiting for " +
numCommits + " commits to appear in " + tablePath)
numInstants
}
+
+ def getInlineClusteringOpts( isInlineClustering: String,
clusteringNumCommit: String, fileMaxRecordNum: Int):Map[String, String] = {
+ commonOpts + (HoodieClusteringConfig.INLINE_CLUSTERING_PROP ->
isInlineClustering,
+ HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP ->
clusteringNumCommit,
+ HoodieStorageConfig.PARQUET_FILE_MAX_BYTES ->
dataGen.getEstimatedFileSizeInBytes(fileMaxRecordNum).toString
+ )
+ }
+
+ @Test
+ def testStructuredStreamingWithInlineClustering(): Unit = {
+ val (sourcePath, destPath) = initStreamingSourceAndDestPath("source",
"dest")
+
+ def checkClusteringResult(destPath: String):Unit = {
+ // check have schedule clustering and clustering file group to one
+ waitTillHasCompletedReplaceInstant(destPath, 120, 5)
+ metaClient.reloadActiveTimeline()
+ assertEquals(1, getLatestFileGroupsFileId.size)
+ }
+ structuredStreamingForTestClusteringRunner(sourcePath, destPath, true,
checkClusteringResult)
+ }
+
+ @Test
+ def testStructuredStreamingWithoutInlineClustering(): Unit = {
+ val (sourcePath, destPath) = initStreamingSourceAndDestPath("source",
"dest")
+
+ def checkClusteringResult(destPath: String):Unit = {
+ val msg = "Should have replace commit completed"
+ assertThrows(classOf[IllegalStateException], new Executable {
+ override def execute(): Unit = {
+ waitTillHasCompletedReplaceInstant(destPath, 120, 5)
+ }
+ }
+ , "Should have replace commit completed")
+ println(msg)
+ }
+ structuredStreamingForTestClusteringRunner(sourcePath, destPath, false,
checkClusteringResult)
+ }
+
+ def structuredStreamingForTestClusteringRunner(sourcePath: String, destPath:
String,
+ isInlineClustering: Boolean,
checkClusteringResult: String => Unit): Unit = {
+ // First insert of data
+ val records1 = recordsToStrings(dataGen.generateInsertsForPartition("000",
100, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).toList
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+
+ // Second insert of data
+ val records2 = recordsToStrings(dataGen.generateInsertsForPartition("001",
100, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).toList
+ val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
+
+ val hudiOptions = getInlineClusteringOpts(isInlineClustering.toString,
"2", 100)
+ val f1 = initStreamingWriteFuture(inputDF1.schema, sourcePath, destPath,
hudiOptions)
+
+ val f2 = Future {
+ inputDF1.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
+ // wait for spark streaming to process one microbatch
+ val currNumCommits = waitTillAtleastNCommits(fs, destPath, 1, 120, 5)
+ assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, destPath, "000"))
+
+ inputDF2.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
+ // wait for spark streaming to process second microbatch
+ waitTillAtleastNCommits(fs, destPath, currNumCommits + 1, 120, 5)
+ assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, destPath,
"000").size())
+
+ // check have more than one file group
+ this.metaClient = new HoodieTableMetaClient(fs.getConf, destPath, true)
+ assertTrue(getLatestFileGroupsFileId().size > 1)
+
+ // check clustering result
+ checkClusteringResult(destPath)
+
+ // check data correct after clustering
+ val hoodieROViewDF2 = spark.read.format("org.apache.hudi")
+ .load(destPath + "/*/*/*/*")
+ assertEquals(200, hoodieROViewDF2.count())
+ }
+ Await.result(Future.sequence(Seq(f1, f2)), Duration.Inf)
+ }
+
+ private def getLatestFileGroupsFileId():Array[String] = {
+ getHoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline,
+ HoodieTestTable.of(metaClient).listAllBaseFiles())
+
tableView.getLatestFileSlices(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
+ .toArray().map(slice =>
slice.asInstanceOf[FileSlice].getFileGroupId.getFileId)
+ }
+
+ @throws[InterruptedException]
+ private def waitTillHasCompletedReplaceInstant(tablePath: String,
+ timeoutSecs: Int,
sleepSecsAfterEachRun: Int) = {
+ val beginTime = System.currentTimeMillis
+ var currTime = beginTime
+ val timeoutMsecs = timeoutSecs * 1000
+ var success = false
+ while ({!success && (currTime - beginTime) < timeoutMsecs}) try {
+ this.metaClient.reloadActiveTimeline()
+ val completeReplaceSize =
this.metaClient.getActiveTimeline.getCompletedReplaceTimeline().getInstants.toArray.size
+ println("completeReplaceSize:" + completeReplaceSize)
+ if(completeReplaceSize > 0) {
+ success = true
+ }
+ } catch {
+ case te: TableNotFoundException =>
+ log.info("Got table not found exception. Retrying")
+ } finally {
+ Thread.sleep(sleepSecsAfterEachRun * 1000)
+ currTime = System.currentTimeMillis
+ }
+ if (!success) throw new IllegalStateException("Timed-out waiting for " +
" have completed replace instant appear in " + tablePath)
+ }
+
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index 9b0097e..6966e2c 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.TableNotFoundException;
@@ -622,23 +623,14 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
String tableBasePath = dfsBasePath + "/" + tempDir;
// Keep it higher than batch-size to test continuous mode
int totalRecords = 3000;
-
// Initial bulk insert
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.UPSERT);
cfg.continuousMode = true;
cfg.tableType = tableType.name();
cfg.configs.add(String.format("%s=%d",
SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfg.configs.add(String.format("%s=false",
HoodieCompactionConfig.AUTO_CLEAN_PROP));
- HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
- Future dsFuture = Executors.newSingleThreadExecutor().submit(() -> {
- try {
- ds.sync();
- } catch (Exception ex) {
- throw new RuntimeException(ex.getMessage(), ex);
- }
- });
- TestHelpers.waitTillCondition((r) -> {
+ deltaStreamerTestRunner(cfg, (r) -> {
if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
TestHelpers.assertAtleastNDeltaCommits(5, tableBasePath, dfs);
TestHelpers.assertAtleastNCompactionCommits(2, tableBasePath, dfs);
@@ -648,11 +640,48 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
TestHelpers.assertRecordCount(totalRecords, tableBasePath +
"/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(totalRecords, tableBasePath +
"/*/*.parquet", sqlContext);
return true;
- }, 180);
+ });
+ }
+
+ private void deltaStreamerTestRunner(HoodieDeltaStreamer.Config cfg,
Function<Boolean, Boolean> condition) throws Exception {
+ HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
+ Future dsFuture = Executors.newSingleThreadExecutor().submit(() -> {
+ try {
+ ds.sync();
+ } catch (Exception ex) {
+ throw new RuntimeException(ex.getMessage(), ex);
+ }
+ });
+
+ TestHelpers.waitTillCondition(condition, 180);
ds.shutdownGracefully();
dsFuture.get();
}
+ @Test
+ public void testInlineClustering() throws Exception {
+ String tableBasePath = dfsBasePath + "/inlineClustering";
+ // Keep it higher than batch-size to test continuous mode
+ int totalRecords = 3000;
+
+ // Initial bulk insert
+ HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.UPSERT);
+ cfg.continuousMode = true;
+ cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
+ cfg.configs.add(String.format("%s=%d",
SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
+ cfg.configs.add(String.format("%s=false",
HoodieCompactionConfig.AUTO_CLEAN_PROP));
+ cfg.configs.add(String.format("%s=%s",
HoodieClusteringConfig.INLINE_CLUSTERING_PROP, "true"));
+ cfg.configs.add(String.format("%s=%s",
HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP, "2"));
+
+ deltaStreamerTestRunner(cfg, (r) -> {
+ HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(this.dfs.getConf(), tableBasePath, true);
+ int pendingReplaceSize =
metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().toArray().length;
+ int completeReplaceSize =
metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length;
+ LOG.info("PendingReplaceSize=" + pendingReplaceSize +
",completeReplaceSize = " + completeReplaceSize);
+ return completeReplaceSize > 0;
+ });
+ }
+
/**
* Test Bulk Insert and upserts with hive syncing. Tests Hudi incremental
processing using a 2 step pipeline The first
* step involves using a SQL template to transform a source TEST-DATA-SOURCE
============================> HUDI TABLE