This is an automated email from the ASF dual-hosted git repository.

nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e6889a  [HUDI-1481]  add  structured streaming and delta streamer 
clustering unit test (#2360)
9e6889a is described below

commit 9e6889a8ce522ec455a11d6f6cf4949255c91a0f
Author: lw0090 <[email protected]>
AuthorDate: Mon Dec 28 12:27:09 2020 +0800

    [HUDI-1481]  add  structured streaming and delta streamer clustering unit 
test (#2360)
---
 .../apache/hudi/functional/TestCOWDataSource.scala |   2 +-
 .../hudi/functional/TestStructuredStreaming.scala  | 165 ++++++++++++++++++---
 .../functional/TestHoodieDeltaStreamer.java        |  51 +++++--
 3 files changed, 183 insertions(+), 35 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 0386f20..51ca72e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -115,7 +115,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
 
     // Upsert Operation without Hudi metadata columns
     val records2 = recordsToStrings(dataGen.generateUpdates("001", 100)).toList
-    val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
+    val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2 , 
2))
     val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count()
 
     inputDF2.write.format("org.apache.hudi")
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
index 7a902c1..b07f00f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
@@ -18,16 +18,20 @@
 package org.apache.hudi.functional
 
 import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hudi.common.model.FileSlice
 import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, 
HoodieTestTable}
 import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
-import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.config.{HoodieClusteringConfig, HoodieStorageConfig, 
HoodieWriteConfig}
 import org.apache.hudi.exception.TableNotFoundException
 import org.apache.hudi.testutils.HoodieClientTestBase
 import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, 
HoodieDataSourceHelpers}
 import org.apache.log4j.LogManager
 import org.apache.spark.sql._
 import org.apache.spark.sql.streaming.{OutputMode, Trigger}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.apache.spark.sql.types.StructType
+import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, 
assertTrue}
+import org.junit.jupiter.api.function.Executable
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 
 import scala.collection.JavaConversions._
@@ -64,35 +68,19 @@ class TestStructuredStreaming extends HoodieClientTestBase {
     cleanupFileSystem()
   }
 
-  @Test
-  def testStructuredStreaming(): Unit = {
-    fs.delete(new Path(basePath), true)
-    val sourcePath = basePath + "/source"
-    val destPath = basePath + "/dest"
-    fs.mkdirs(new Path(sourcePath))
-
-    // First chunk of data
-    val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
-    val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
-
-    // Second chunk of data
-    val records2 = recordsToStrings(dataGen.generateUpdates("001", 100)).toList
-    val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
-    val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count()
-
+  def initStreamingWriteFuture(schema: StructType, sourcePath: String, 
destPath: String, hudiOptions: Map[String, String]): Future[Unit] = {
     // define the source of streaming
     val streamingInput =
       spark.readStream
-        .schema(inputDF1.schema)
+        .schema(schema)
         .json(sourcePath)
-
-    val f1 = Future {
+    Future {
       println("streaming starting")
       //'writeStream' can be called only on streaming Dataset/DataFrame
       streamingInput
         .writeStream
         .format("org.apache.hudi")
-        .options(commonOpts)
+        .options(hudiOptions)
         .trigger(Trigger.ProcessingTime(100))
         .option("checkpointLocation", basePath + "/checkpoint")
         .outputMode(OutputMode.Append)
@@ -100,6 +88,29 @@ class TestStructuredStreaming extends HoodieClientTestBase {
         .awaitTermination(10000)
       println("streaming ends")
     }
+  }
+
+  def initStreamingSourceAndDestPath(sourceDirName: String, destDirName: 
String): (String, String) = {
+    fs.delete(new Path(basePath), true)
+    val sourcePath = basePath + "/" + sourceDirName
+    val destPath = basePath + "/" + destDirName
+    fs.mkdirs(new Path(sourcePath))
+    (sourcePath, destPath)
+  }
+
+  @Test
+  def testStructuredStreaming(): Unit = {
+    val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", 
"dest")
+    // First chunk of data
+    val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+    val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+
+    // Second chunk of data
+    val records2 = recordsToStrings(dataGen.generateUpdates("001", 100)).toList
+    val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
+    val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count()
+
+    val f1 = initStreamingWriteFuture(inputDF1.schema, sourcePath, destPath, 
commonOpts)
 
     val f2 = Future {
       inputDF1.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
@@ -113,7 +124,7 @@ class TestStructuredStreaming extends HoodieClientTestBase {
       assert(hoodieROViewDF1.count() == 100)
 
       inputDF2.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
-      // wait for spark streaming to process one microbatch
+      // wait for spark streaming to process second microbatch
       waitTillAtleastNCommits(fs, destPath, currNumCommits + 1, 120, 5)
       val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, 
destPath)
       assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, 
"000").size())
@@ -177,4 +188,112 @@ class TestStructuredStreaming extends 
HoodieClientTestBase {
     if (!success) throw new IllegalStateException("Timed-out waiting for " + 
numCommits + " commits to appear in " + tablePath)
     numInstants
   }
+
+  def getInlineClusteringOpts( isInlineClustering: String, 
clusteringNumCommit: String, fileMaxRecordNum: Int):Map[String, String] = {
+    commonOpts + (HoodieClusteringConfig.INLINE_CLUSTERING_PROP -> 
isInlineClustering,
+      HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP -> 
clusteringNumCommit,
+      HoodieStorageConfig.PARQUET_FILE_MAX_BYTES -> 
dataGen.getEstimatedFileSizeInBytes(fileMaxRecordNum).toString
+    )
+  }
+
+  @Test
+  def testStructuredStreamingWithInlineClustering(): Unit = {
+    val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", 
"dest")
+
+    def checkClusteringResult(destPath: String):Unit = {
+      // check have schedule clustering and clustering file group to one
+      waitTillHasCompletedReplaceInstant(destPath, 120, 5)
+      metaClient.reloadActiveTimeline()
+      assertEquals(1, getLatestFileGroupsFileId.size)
+    }
+    structuredStreamingForTestClusteringRunner(sourcePath, destPath, true, 
checkClusteringResult)
+  }
+
+  @Test
+  def testStructuredStreamingWithoutInlineClustering(): Unit = {
+    val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", 
"dest")
+
+    def checkClusteringResult(destPath: String):Unit = {
+      val msg = "Should have replace commit completed"
+      assertThrows(classOf[IllegalStateException], new Executable {
+        override def execute(): Unit = {
+          waitTillHasCompletedReplaceInstant(destPath, 120, 5)
+        }
+      }
+        , "Should have replace commit completed")
+      println(msg)
+    }
+    structuredStreamingForTestClusteringRunner(sourcePath, destPath, false, 
checkClusteringResult)
+  }
+
+  def structuredStreamingForTestClusteringRunner(sourcePath: String, destPath: 
String,
+                                           isInlineClustering: Boolean, 
checkClusteringResult: String => Unit): Unit = {
+    // First insert of data
+    val records1 = recordsToStrings(dataGen.generateInsertsForPartition("000", 
100, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).toList
+    val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+
+    // Second insert of data
+    val records2 = recordsToStrings(dataGen.generateInsertsForPartition("001", 
100, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).toList
+    val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
+
+    val hudiOptions = getInlineClusteringOpts(isInlineClustering.toString, 
"2", 100)
+    val f1 = initStreamingWriteFuture(inputDF1.schema, sourcePath, destPath, 
hudiOptions)
+
+    val f2 = Future {
+      inputDF1.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
+      // wait for spark streaming to process one microbatch
+      val currNumCommits = waitTillAtleastNCommits(fs, destPath, 1, 120, 5)
+      assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, destPath, "000"))
+
+      inputDF2.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
+      // wait for spark streaming to process second microbatch
+      waitTillAtleastNCommits(fs, destPath, currNumCommits + 1, 120, 5)
+      assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, 
"000").size())
+
+      // check have more than one file group
+      this.metaClient = new HoodieTableMetaClient(fs.getConf, destPath, true)
+      assertTrue(getLatestFileGroupsFileId().size > 1)
+
+      // check clustering result
+      checkClusteringResult(destPath)
+
+      // check data correct after clustering
+      val hoodieROViewDF2 = spark.read.format("org.apache.hudi")
+        .load(destPath + "/*/*/*/*")
+      assertEquals(200, hoodieROViewDF2.count())
+    }
+    Await.result(Future.sequence(Seq(f1, f2)), Duration.Inf)
+  }
+
+  private def getLatestFileGroupsFileId():Array[String] = {
+    getHoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline,
+      HoodieTestTable.of(metaClient).listAllBaseFiles())
+    
tableView.getLatestFileSlices(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
+      .toArray().map(slice => 
slice.asInstanceOf[FileSlice].getFileGroupId.getFileId)
+  }
+
+  @throws[InterruptedException]
+  private def waitTillHasCompletedReplaceInstant(tablePath: String,
+                                                timeoutSecs: Int, 
sleepSecsAfterEachRun: Int) = {
+    val beginTime = System.currentTimeMillis
+    var currTime = beginTime
+    val timeoutMsecs = timeoutSecs * 1000
+    var success = false
+    while ({!success && (currTime - beginTime) < timeoutMsecs}) try {
+      this.metaClient.reloadActiveTimeline()
+      val completeReplaceSize = 
this.metaClient.getActiveTimeline.getCompletedReplaceTimeline().getInstants.toArray.size
+      println("completeReplaceSize:" + completeReplaceSize)
+      if(completeReplaceSize > 0) {
+        success = true
+      }
+    } catch {
+      case te: TableNotFoundException =>
+        log.info("Got table not found exception. Retrying")
+    } finally {
+      Thread.sleep(sleepSecsAfterEachRun * 1000)
+      currTime = System.currentTimeMillis
+    }
+    if (!success) throw new IllegalStateException("Timed-out waiting for "  + 
" have completed replace instant appear in " + tablePath)
+  }
+
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index 9b0097e..6966e2c 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.TableNotFoundException;
@@ -622,23 +623,14 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
     String tableBasePath = dfsBasePath + "/" + tempDir;
     // Keep it higher than batch-size to test continuous mode
     int totalRecords = 3000;
-
     // Initial bulk insert
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.UPSERT);
     cfg.continuousMode = true;
     cfg.tableType = tableType.name();
     cfg.configs.add(String.format("%s=%d", 
SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
     cfg.configs.add(String.format("%s=false", 
HoodieCompactionConfig.AUTO_CLEAN_PROP));
-    HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
-    Future dsFuture = Executors.newSingleThreadExecutor().submit(() -> {
-      try {
-        ds.sync();
-      } catch (Exception ex) {
-        throw new RuntimeException(ex.getMessage(), ex);
-      }
-    });
 
-    TestHelpers.waitTillCondition((r) -> {
+    deltaStreamerTestRunner(cfg, (r) -> {
       if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
         TestHelpers.assertAtleastNDeltaCommits(5, tableBasePath, dfs);
         TestHelpers.assertAtleastNCompactionCommits(2, tableBasePath, dfs);
@@ -648,11 +640,48 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
       TestHelpers.assertRecordCount(totalRecords, tableBasePath + 
"/*/*.parquet", sqlContext);
       TestHelpers.assertDistanceCount(totalRecords, tableBasePath + 
"/*/*.parquet", sqlContext);
       return true;
-    }, 180);
+    });
+  }
+
+  private void deltaStreamerTestRunner(HoodieDeltaStreamer.Config cfg, 
Function<Boolean, Boolean> condition) throws Exception {
+    HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
+    Future dsFuture = Executors.newSingleThreadExecutor().submit(() -> {
+      try {
+        ds.sync();
+      } catch (Exception ex) {
+        throw new RuntimeException(ex.getMessage(), ex);
+      }
+    });
+
+    TestHelpers.waitTillCondition(condition, 180);
     ds.shutdownGracefully();
     dsFuture.get();
   }
 
+  @Test
+  public void testInlineClustering() throws Exception {
+    String tableBasePath = dfsBasePath + "/inlineClustering";
+    // Keep it higher than batch-size to test continuous mode
+    int totalRecords = 3000;
+
+    // Initial bulk insert
+    HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.UPSERT);
+    cfg.continuousMode = true;
+    cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
+    cfg.configs.add(String.format("%s=%d", 
SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
+    cfg.configs.add(String.format("%s=false", 
HoodieCompactionConfig.AUTO_CLEAN_PROP));
+    cfg.configs.add(String.format("%s=%s", 
HoodieClusteringConfig.INLINE_CLUSTERING_PROP, "true"));
+    cfg.configs.add(String.format("%s=%s", 
HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP, "2"));
+
+    deltaStreamerTestRunner(cfg, (r) -> {
+      HoodieTableMetaClient metaClient =  new 
HoodieTableMetaClient(this.dfs.getConf(), tableBasePath, true);
+      int pendingReplaceSize = 
metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().toArray().length;
+      int completeReplaceSize = 
metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length;
+      LOG.info("PendingReplaceSize=" + pendingReplaceSize + 
",completeReplaceSize = " + completeReplaceSize);
+      return completeReplaceSize > 0;
+    });
+  }
+
   /**
    * Test Bulk Insert and upserts with hive syncing. Tests Hudi incremental 
processing using a 2 step pipeline The first
    * step involves using a SQL template to transform a source TEST-DATA-SOURCE 
============================> HUDI TABLE

Reply via email to