This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2644803d8b0 [HUDI-6445] Fix flaky TestIncrementalReadWithFullTableScan
(#9062)
2644803d8b0 is described below
commit 2644803d8b0112ceebe2e32412a4fdd985bc173e
Author: Sagar Sumit <[email protected]>
AuthorDate: Wed Jun 28 04:23:12 2023 +0530
[HUDI-6445] Fix flaky TestIncrementalReadWithFullTableScan (#9062)
---
.../TestIncrementalReadWithFullTableScan.scala | 32 +++++++++-------------
1 file changed, 13 insertions(+), 19 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala
index 1d89f105331..7c89f36562b 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala
@@ -28,34 +28,20 @@ import
org.apache.hudi.exception.HoodieIncrementalPathNotFoundException
import org.apache.hudi.testutils.HoodieSparkClientTestBase
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
import org.apache.spark.SparkException
-import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession}
+import org.apache.spark.sql.{SaveMode, SparkSession}
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows,
assertTrue}
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{AfterEach, BeforeEach}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.EnumSource
-import org.slf4j.LoggerFactory
import scala.collection.JavaConversions.asScalaBuffer
class TestIncrementalReadWithFullTableScan extends HoodieSparkClientTestBase {
- var spark: SparkSession = null
- private val log =
LoggerFactory.getLogger(classOf[TestIncrementalReadWithFullTableScan])
-
+ var spark: SparkSession = _
private val perBatchSize = 100
- val commonOpts = Map(
- "hoodie.insert.shuffle.parallelism" -> "4",
- "hoodie.upsert.shuffle.parallelism" -> "4",
- DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
- DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
- DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
- HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
- HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "1"
- )
-
-
val verificationCol: String = "driver"
val updatedVerificationVal: String = "driver_update"
@@ -69,14 +55,22 @@ class TestIncrementalReadWithFullTableScan extends
HoodieSparkClientTestBase {
}
@AfterEach override def tearDown() = {
- cleanupSparkContexts()
- cleanupTestDataGenerator()
- cleanupFileSystem()
+ spark = null
+ cleanupResources()
}
@ParameterizedTest
@EnumSource(value = classOf[HoodieTableType])
def testFailEarlyForIncrViewQueryForNonExistingFiles(tableType:
HoodieTableType): Unit = {
+ val commonOpts = Map(
+ "hoodie.insert.shuffle.parallelism" -> "4",
+ "hoodie.upsert.shuffle.parallelism" -> "4",
+ DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
+ DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
+ DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "1"
+ )
// Create 10 commits
for (i <- 1 to 10) {
val records = recordsToStrings(dataGen.generateInserts("%05d".format(i),
perBatchSize)).toList