This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b21ae68  [MINOR] Improving runtime of TestStructuredStreaming by 2 
mins (#3382)
b21ae68 is described below

commit b21ae68e6710cab12a95eb62552541b47ef891cb
Author: vinoth chandar <[email protected]>
AuthorDate: Mon Aug 2 13:42:46 2021 -0700

    [MINOR] Improving runtime of TestStructuredStreaming by 2 mins (#3382)
---
 .../hudi/functional/TestStructuredStreaming.scala  | 38 +++++++---------------
 1 file changed, 11 insertions(+), 27 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
index 66cb1ca..483ed92 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
@@ -30,8 +30,7 @@ import org.apache.log4j.LogManager
 import org.apache.spark.sql._
 import org.apache.spark.sql.streaming.{OutputMode, Trigger}
 import org.apache.spark.sql.types.StructType
-import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, 
assertTrue}
-import org.junit.jupiter.api.function.Executable
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 
 import scala.collection.JavaConversions._
@@ -183,8 +182,10 @@ class TestStructuredStreaming extends HoodieClientTestBase 
{
       case te: TableNotFoundException =>
         log.info("Got table not found exception. Retrying")
     } finally {
-      Thread.sleep(sleepSecsAfterEachRun * 1000)
-      currTime = System.currentTimeMillis
+      if (!success) {
+        Thread.sleep(sleepSecsAfterEachRun * 1000)
+        currTime = System.currentTimeMillis
+      }
     }
     if (!success) throw new IllegalStateException("Timed-out waiting for " + 
numCommits + " commits to appear in " + tablePath)
     numInstants
@@ -207,7 +208,7 @@ class TestStructuredStreaming extends HoodieClientTestBase {
 
     def checkClusteringResult(destPath: String):Unit = {
       // check have schedule clustering and clustering file group to one
-      waitTillHasCompletedReplaceInstant(destPath, 120, 5)
+      waitTillHasCompletedReplaceInstant(destPath, 120, 1)
       metaClient.reloadActiveTimeline()
       assertEquals(1, 
getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size)
     }
@@ -221,7 +222,7 @@ class TestStructuredStreaming extends HoodieClientTestBase {
 
     def checkClusteringResult(destPath: String):Unit = {
       // check have schedule clustering and clustering file group to one
-      waitTillHasCompletedReplaceInstant(destPath, 120, 5)
+      waitTillHasCompletedReplaceInstant(destPath, 120, 1)
       metaClient.reloadActiveTimeline()
       assertEquals(1, 
getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size)
     }
@@ -235,7 +236,7 @@ class TestStructuredStreaming extends HoodieClientTestBase {
 
     def checkClusteringResult(destPath: String):Unit = {
       // check have schedule clustering and clustering file group to one
-      waitTillHasCompletedReplaceInstant(destPath, 120, 5)
+      waitTillHasCompletedReplaceInstant(destPath, 120, 1)
       metaClient.reloadActiveTimeline()
       assertEquals(1, 
getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size)
     }
@@ -243,23 +244,6 @@ class TestStructuredStreaming extends HoodieClientTestBase 
{
       HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, 
checkClusteringResult)
   }
 
-  @Test
-  def testStructuredStreamingWithoutClustering(): Unit = {
-    val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", 
"dest")
-
-    def checkClusteringResult(destPath: String):Unit = {
-      val msg = "Should have replace commit completed"
-      assertThrows(classOf[IllegalStateException], new Executable {
-        override def execute(): Unit = {
-          waitTillHasCompletedReplaceInstant(destPath, 120, 5)
-        }
-      }, msg)
-      println(msg)
-    }
-    structuredStreamingForTestClusteringRunner(sourcePath, destPath, false, 
false, false,
-      HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, 
checkClusteringResult)
-  }
-
   def structuredStreamingForTestClusteringRunner(sourcePath: String, destPath: 
String, isInlineClustering: Boolean,
                                                  isAsyncClustering: Boolean, 
isAsyncCompaction: Boolean,
                                                  partitionOfRecords: String, 
checkClusteringResult: String => Unit): Unit = {
@@ -285,17 +269,17 @@ class TestStructuredStreaming extends 
HoodieClientTestBase {
       // wait for spark streaming to process second microbatch
       currNumCommits = waitTillAtleastNCommits(fs, destPath, currNumCommits + 
1, 120, 5)
       // for inline clustering, clustering may be complete along with 2nd 
commit
-      if (HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, 
destPath).getCompletedReplaceTimeline().countInstants() > 0) {
+      if (HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, 
destPath).getCompletedReplaceTimeline.countInstants() > 0) {
         assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, 
"000").size())
         // check have at least one file group
         this.metaClient = 
HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(destPath)
-        .setLoadActiveTimelineOnLoad(true).build()
+          .setLoadActiveTimelineOnLoad(true).build()
         assertTrue(getLatestFileGroupsFileId(partitionOfRecords).size > 0)
       } else {
         assertEquals(currNumCommits, 
HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size())
         // check have more than one file group
         this.metaClient = 
HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(destPath)
-        .setLoadActiveTimelineOnLoad(true).build()
+          .setLoadActiveTimelineOnLoad(true).build()
         assertTrue(getLatestFileGroupsFileId(partitionOfRecords).size > 1)
       }
 

Reply via email to