This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b21ae68 [MINOR] Improving runtime of TestStructuredStreaming by 2
mins (#3382)
b21ae68 is described below
commit b21ae68e6710cab12a95eb62552541b47ef891cb
Author: vinoth chandar <[email protected]>
AuthorDate: Mon Aug 2 13:42:46 2021 -0700
[MINOR] Improving runtime of TestStructuredStreaming by 2 mins (#3382)
---
.../hudi/functional/TestStructuredStreaming.scala | 38 +++++++---------------
1 file changed, 11 insertions(+), 27 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
index 66cb1ca..483ed92 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
@@ -30,8 +30,7 @@ import org.apache.log4j.LogManager
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.types.StructType
-import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows,
assertTrue}
-import org.junit.jupiter.api.function.Executable
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import scala.collection.JavaConversions._
@@ -183,8 +182,10 @@ class TestStructuredStreaming extends HoodieClientTestBase
{
case te: TableNotFoundException =>
log.info("Got table not found exception. Retrying")
} finally {
- Thread.sleep(sleepSecsAfterEachRun * 1000)
- currTime = System.currentTimeMillis
+ if (!success) {
+ Thread.sleep(sleepSecsAfterEachRun * 1000)
+ currTime = System.currentTimeMillis
+ }
}
if (!success) throw new IllegalStateException("Timed-out waiting for " +
numCommits + " commits to appear in " + tablePath)
numInstants
@@ -207,7 +208,7 @@ class TestStructuredStreaming extends HoodieClientTestBase {
def checkClusteringResult(destPath: String):Unit = {
// check have schedule clustering and clustering file group to one
- waitTillHasCompletedReplaceInstant(destPath, 120, 5)
+ waitTillHasCompletedReplaceInstant(destPath, 120, 1)
metaClient.reloadActiveTimeline()
assertEquals(1,
getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size)
}
@@ -221,7 +222,7 @@ class TestStructuredStreaming extends HoodieClientTestBase {
def checkClusteringResult(destPath: String):Unit = {
// check have schedule clustering and clustering file group to one
- waitTillHasCompletedReplaceInstant(destPath, 120, 5)
+ waitTillHasCompletedReplaceInstant(destPath, 120, 1)
metaClient.reloadActiveTimeline()
assertEquals(1,
getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size)
}
@@ -235,7 +236,7 @@ class TestStructuredStreaming extends HoodieClientTestBase {
def checkClusteringResult(destPath: String):Unit = {
// check have schedule clustering and clustering file group to one
- waitTillHasCompletedReplaceInstant(destPath, 120, 5)
+ waitTillHasCompletedReplaceInstant(destPath, 120, 1)
metaClient.reloadActiveTimeline()
assertEquals(1,
getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size)
}
@@ -243,23 +244,6 @@ class TestStructuredStreaming extends HoodieClientTestBase
{
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
checkClusteringResult)
}
- @Test
- def testStructuredStreamingWithoutClustering(): Unit = {
- val (sourcePath, destPath) = initStreamingSourceAndDestPath("source",
"dest")
-
- def checkClusteringResult(destPath: String):Unit = {
- val msg = "Should have replace commit completed"
- assertThrows(classOf[IllegalStateException], new Executable {
- override def execute(): Unit = {
- waitTillHasCompletedReplaceInstant(destPath, 120, 5)
- }
- }, msg)
- println(msg)
- }
- structuredStreamingForTestClusteringRunner(sourcePath, destPath, false,
false, false,
- HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
checkClusteringResult)
- }
-
def structuredStreamingForTestClusteringRunner(sourcePath: String, destPath:
String, isInlineClustering: Boolean,
isAsyncClustering: Boolean,
isAsyncCompaction: Boolean,
partitionOfRecords: String,
checkClusteringResult: String => Unit): Unit = {
@@ -285,17 +269,17 @@ class TestStructuredStreaming extends
HoodieClientTestBase {
// wait for spark streaming to process second microbatch
currNumCommits = waitTillAtleastNCommits(fs, destPath, currNumCommits +
1, 120, 5)
// for inline clustering, clustering may be complete along with 2nd
commit
- if (HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs,
destPath).getCompletedReplaceTimeline().countInstants() > 0) {
+ if (HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs,
destPath).getCompletedReplaceTimeline.countInstants() > 0) {
assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, destPath,
"000").size())
// check have at least one file group
this.metaClient =
HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(destPath)
- .setLoadActiveTimelineOnLoad(true).build()
+ .setLoadActiveTimelineOnLoad(true).build()
assertTrue(getLatestFileGroupsFileId(partitionOfRecords).size > 0)
} else {
assertEquals(currNumCommits,
HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size())
// check have more than one file group
this.metaClient =
HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(destPath)
- .setLoadActiveTimelineOnLoad(true).build()
+ .setLoadActiveTimelineOnLoad(true).build()
assertTrue(getLatestFileGroupsFileId(partitionOfRecords).size > 1)
}