This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 192bedf2503a refactor(spark): Keep one latestCommitCompletionTime
method in DataSourceTestUtils (#17608)
192bedf2503a is described below
commit 192bedf2503a91db533e460c1ee67385caa8cb96
Author: Shawn Chang <[email protected]>
AuthorDate: Tue Dec 16 16:22:23 2025 -0800
refactor(spark): Keep one latestCommitCompletionTime method in
DataSourceTestUtils (#17608)
---
.../apache/hudi/testutils/DataSourceTestUtils.java | 5 ----
.../hudi/functional/TestCOWDataSourceStorage.scala | 25 ++++++++++----------
.../functional/TestDataSourceForBootstrap.scala | 14 +++++++----
.../hudi/functional/TestSparkDataSource.scala | 27 +++++++++++-----------
.../dml/others/TestHoodieTableValuedFunction.scala | 9 ++++----
.../sql/hudi/dml/others/TestMergeIntoTable.scala | 9 ++++----
.../hudi/procedure/TestCopyToTableProcedure.scala | 9 ++++----
7 files changed, 51 insertions(+), 47 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
index f2c2c06aeef3..49e904b9c403 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
@@ -184,11 +184,6 @@ public class DataSourceTestUtils {
return true;
}
- public static String latestCommitCompletionTime(FileSystem fs, String
basePath) {
- return HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
- .getLatestCompletionTime().orElse(null);
- }
-
public static String latestCommitCompletionTime(HoodieStorage storage,
String basePath) {
return HoodieDataSourceHelpers.allCompletedCommitsCompactions(storage,
basePath)
.getLatestCompletionTime().orElse(null);
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
index 59edb399259a..158c98ebeac1 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
@@ -28,10 +28,11 @@ import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import
org.apache.hudi.common.testutils.HoodieTestDataGenerator.recordsToStrings
+import org.apache.hudi.common.testutils.HoodieTestUtils
import org.apache.hudi.config.{HoodiePreCommitValidatorConfig,
HoodieWriteConfig}
import org.apache.hudi.exception.{HoodieUpsertException,
HoodieValidationException}
-import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.keygen.{NonpartitionedKeyGenerator,
TimestampBasedKeyGenerator}
+import org.apache.hudi.storage.StoragePath
import org.apache.hudi.testutils.{DataSourceTestUtils,
SparkClientFunctionalTestHarness}
import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
import
org.apache.hudi.testutils.SparkClientFunctionalTestHarness.getSparkSqlConf
@@ -96,7 +97,7 @@ class TestCOWDataSourceStorage extends
SparkClientFunctionalTestHarness {
options += TIMESTAMP_OUTPUT_DATE_FORMAT.key -> "yyyyMMdd"
}
val dataGen = new HoodieTestDataGenerator(0xDEED)
- val fs = HadoopFSUtils.getFs(basePath,
spark.sparkContext.hadoopConfiguration)
+ val storage = HoodieTestUtils.getStorage(new StoragePath(basePath))
// Insert Operation
val records0 = recordsToStrings(dataGen.generateInserts("000",
100)).asScala.toList
val inputDF0 = spark.read.json(spark.sparkContext.parallelize(records0, 2))
@@ -106,8 +107,8 @@ class TestCOWDataSourceStorage extends
SparkClientFunctionalTestHarness {
.mode(SaveMode.Overwrite)
.save(basePath)
- val completionTime1 = DataSourceTestUtils.latestCommitCompletionTime(fs,
basePath)
- assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
+ val completionTime1 =
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
+ assertTrue(HoodieDataSourceHelpers.hasNewCommits(storage, basePath, "000"))
// Snapshot query
val snapshotDF1 = spark.read.format("org.apache.hudi")
@@ -137,7 +138,7 @@ class TestCOWDataSourceStorage extends
SparkClientFunctionalTestHarness {
.mode(SaveMode.Append)
.save(basePath)
- val completionTime2 = DataSourceTestUtils.latestCommitCompletionTime(fs,
basePath)
+ val completionTime2 =
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
val snapshotDF2 = spark.read.format("hudi")
.options(readOptions)
.load(basePath)
@@ -172,8 +173,8 @@ class TestCOWDataSourceStorage extends
SparkClientFunctionalTestHarness {
.mode(SaveMode.Append)
.save(basePath)
- val commitInstantTime3 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
- assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, basePath,
"000").size())
+ val commitInstantTime3 = HoodieDataSourceHelpers.latestCommit(storage,
basePath)
+ assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(storage,
basePath, "000").size())
// Snapshot Query
val snapshotDF3 = spark.read.format("org.apache.hudi")
@@ -183,7 +184,7 @@ class TestCOWDataSourceStorage extends
SparkClientFunctionalTestHarness {
// Read Incremental Query
// we have 2 commits, try pulling the first commit (which is not the
latest)
- val firstCommit = HoodieDataSourceHelpers.listCommitsSince(fs, basePath,
"000").get(0)
+ val firstCommit = HoodieDataSourceHelpers.listCommitsSince(storage,
basePath, "000").get(0)
// Setting HoodieROTablePathFilter here to test whether pathFilter can
filter out correctly for IncrementalRelation
spark.sparkContext.hadoopConfiguration.set("mapreduce.input.pathFilter.class",
"org.apache.hudi.hadoop.HoodieROTablePathFilter")
val hoodieIncViewDF1 = spark.read.format("org.apache.hudi")
@@ -325,7 +326,7 @@ class TestCOWDataSourceStorage extends
SparkClientFunctionalTestHarness {
}
val dataGen = new HoodieTestDataGenerator(0xDEED)
- val fs = HadoopFSUtils.getFs(basePath,
spark.sparkContext.hadoopConfiguration)
+ val storage = HoodieTestUtils.getStorage(new StoragePath(basePath))
val records = recordsToStrings(dataGen.generateInserts("001",
100)).asScala.toList
// First commit, new partition, no existing table schema
@@ -337,7 +338,7 @@ class TestCOWDataSourceStorage extends
SparkClientFunctionalTestHarness {
.options(options)
.mode(SaveMode.Overwrite)
.save(basePath)
- assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath,
"000").size())
+ assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(storage,
basePath, "000").size())
// Second commit, new partition, has existing table schema
// Validation should succeed
@@ -352,7 +353,7 @@ class TestCOWDataSourceStorage extends
SparkClientFunctionalTestHarness {
.options(options)
.mode(SaveMode.Append)
.save(basePath)
- assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, basePath,
"000").size())
+ assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(storage,
basePath, "000").size())
// Third commit, new or existing partition, overwrite "driver" column to
null for validation
// Validation should succeed or fail, based on the query
@@ -370,7 +371,7 @@ class TestCOWDataSourceStorage extends
SparkClientFunctionalTestHarness {
.options(options)
.mode(SaveMode.Append)
.save(basePath)
- assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, basePath,
"000").size())
+ assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(storage,
basePath, "000").size())
} else {
assertThrowsWithPreCommitValidator(new Executable() {
override def execute(): Unit = {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
index 7b7345398699..0ce0ce2d1ee2 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
@@ -25,10 +25,12 @@ import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.table.timeline.HoodieTimeline
+import org.apache.hudi.common.testutils.HoodieTestUtils
import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieClusteringConfig,
HoodieCompactionConfig, HoodieWriteConfig}
import org.apache.hudi.functional.TestDataSourceForBootstrap.{dropMetaCols,
sort}
import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator}
+import org.apache.hudi.storage.{HoodieStorage, StoragePath}
import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieClientTestUtils}
import org.apache.hadoop.fs.{FileSystem, Path}
@@ -71,6 +73,7 @@ class TestDataSourceForBootstrap {
var basePath: String = _
var srcPath: String = _
var fs: FileSystem = _
+ var storage: HoodieStorage = _
val partitionPaths: List[String] = List("2020-04-01", "2020-04-02",
"2020-04-03")
val numRecords: Int = 100
@@ -93,6 +96,7 @@ class TestDataSourceForBootstrap {
basePath = tempDir.toAbsolutePath.toString + "/base"
srcPath = tempDir.toAbsolutePath.toString + "/src"
fs = HadoopFSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration)
+ storage = HoodieTestUtils.getStorage(new StoragePath(basePath))
}
@AfterEach def tearDown(): Unit ={
@@ -130,7 +134,7 @@ class TestDataSourceForBootstrap {
extraOpts = options ++
Map(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> bootstrapKeygenClass),
bootstrapKeygenClass = bootstrapKeygenClass
)
- val commitCompletionTime1 =
DataSourceTestUtils.latestCommitCompletionTime(fs, basePath)
+ val commitCompletionTime1 =
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
// check marked directory clean up
assert(!fs.exists(new Path(basePath, ".hoodie/.temp/00000000000001")))
@@ -205,7 +209,7 @@ class TestDataSourceForBootstrap {
readOpts ++ getRecordTypeOpts(recordType),
classOf[SimpleKeyGenerator].getName)
- val commitCompletionTime1 =
DataSourceTestUtils.latestCommitCompletionTime(fs, basePath)
+ val commitCompletionTime1 =
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
// check marked directory clean up
assert(!fs.exists(new Path(basePath, ".hoodie/.temp/00000000000001")))
@@ -281,7 +285,7 @@ class TestDataSourceForBootstrap {
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
writeOpts,
classOf[SimpleKeyGenerator].getName)
- val commitCompletionTime1 =
DataSourceTestUtils.latestCommitCompletionTime(fs, basePath)
+ val commitCompletionTime1 =
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
// Read bootstrapped table and verify count using glob path
val hoodieROViewDF1 = spark.read.format("hudi").load(basePath)
@@ -582,8 +586,8 @@ class TestDataSourceForBootstrap {
.mode(SaveMode.Overwrite)
.save(basePath)
- val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs,
basePath)
- val commitInstantCompletionTime1: String =
DataSourceTestUtils.latestCommitCompletionTime(fs, basePath)
+ val commitInstantTime1: String =
HoodieDataSourceHelpers.latestCommit(storage, basePath)
+ val commitInstantCompletionTime1: String =
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
assertEquals(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS, commitInstantTime1)
// Read bootstrapped table and verify count
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
index f5ee90920f5f..a5f92e55fa9f 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
@@ -27,12 +27,13 @@ import org.apache.hudi.common.table.{HoodieTableConfig,
HoodieTableMetaClient}
import org.apache.hudi.common.table.read.CustomPayloadForTesting
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import
org.apache.hudi.common.testutils.HoodieTestDataGenerator.recordsToStrings
+import org.apache.hudi.common.testutils.HoodieTestUtils
import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig,
HoodieWriteConfig}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.functional.CommonOptionUtils.getWriterReaderOpts
-import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.index.HoodieIndex.IndexType
import org.apache.hudi.keygen.NonpartitionedKeyGenerator
+import org.apache.hudi.storage.StoragePath
import org.apache.hudi.testutils.{DataSourceTestUtils,
SparkClientFunctionalTestHarness}
import
org.apache.hudi.testutils.SparkClientFunctionalTestHarness.getSparkSqlConf
@@ -86,7 +87,7 @@ class TestSparkDataSource extends
SparkClientFunctionalTestHarness {
// order of cols in inputDf and hudiDf differs slightly. so had to choose
columns specifically to compare df directly.
val colsToSelect = "_row_key, begin_lat, begin_lon, city_to_state.LA,
current_date, current_ts, distance_in_meters, driver, end_lat, end_lon,
fare.amount, fare.currency, partition, partition_path, rider, timestamp,
weight, _hoodie_is_deleted"
val dataGen = new HoodieTestDataGenerator(0xDEED)
- val fs = HadoopFSUtils.getFs(basePath,
spark.sparkContext.hadoopConfiguration)
+ val storage = HoodieTestUtils.getStorage(new StoragePath(basePath))
// Insert Operation
val records0 = recordsToStrings(dataGen.generateInserts("000",
10)).asScala.toList
val inputDf0 = spark.read.json(spark.sparkContext.parallelize(records0,
parallelism)).cache
@@ -95,8 +96,8 @@ class TestSparkDataSource extends
SparkClientFunctionalTestHarness {
.option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Overwrite)
.save(basePath)
- val commitCompletionTime1 =
DataSourceTestUtils.latestCommitCompletionTime(fs, basePath)
- assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
+ val commitCompletionTime1 =
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
+ assertTrue(HoodieDataSourceHelpers.hasNewCommits(storage, basePath, "000"))
// Snapshot query
val snapshotDf1 = spark.read.format("org.apache.hudi")
@@ -113,8 +114,8 @@ class TestSparkDataSource extends
SparkClientFunctionalTestHarness {
.options(options)
.mode(SaveMode.Append)
.save(basePath)
- val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
- val commitCompletionTime2 =
DataSourceTestUtils.latestCommitCompletionTime(fs, basePath)
+ val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(storage,
basePath)
+ val commitCompletionTime2 =
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
val snapshotDf2 = spark.read.format("org.apache.hudi")
.option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabledOnRead)
@@ -132,9 +133,9 @@ class TestSparkDataSource extends
SparkClientFunctionalTestHarness {
.mode(SaveMode.Append)
.save(basePath)
- val commitInstantTime3 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
- val commitCompletionTime3 =
DataSourceTestUtils.latestCommitCompletionTime(fs, basePath)
- assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, basePath,
"000").size())
+ val commitInstantTime3 = HoodieDataSourceHelpers.latestCommit(storage,
basePath)
+ val commitCompletionTime3 =
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
+ assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(storage,
basePath, "000").size())
// Snapshot Query
val snapshotDf3 = spark.read.format("org.apache.hudi")
@@ -146,7 +147,7 @@ class TestSparkDataSource extends
SparkClientFunctionalTestHarness {
// Read Incremental Query
// we have 2 commits, try pulling the first commit (which is not the
latest)
- val firstCommit = HoodieDataSourceHelpers.listCommitsSince(fs, basePath,
"000").get(0)
+ val firstCommit = HoodieDataSourceHelpers.listCommitsSince(storage,
basePath, "000").get(0)
val hoodieIncViewDf1 = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.START_COMMIT.key, "000")
@@ -249,7 +250,7 @@ class TestSparkDataSource extends
SparkClientFunctionalTestHarness {
// order of cols in inputDf and hudiDf differs slightly. so had to choose
columns specifically to compare df directly.
val colsToSelect = "_row_key, begin_lat, begin_lon, city_to_state.LA,
current_date, current_ts, distance_in_meters, driver, end_lat, end_lon,
fare.amount, fare.currency, partition, partition_path, rider, timestamp,
weight, _hoodie_is_deleted"
val dataGen = new HoodieTestDataGenerator(0xDEED)
- val fs = HadoopFSUtils.getFs(basePath,
spark.sparkContext.hadoopConfiguration)
+ val storage = HoodieTestUtils.getStorage(new StoragePath(basePath))
// Insert Operation
val records0 = recordsToStrings(dataGen.generateInserts("000",
10)).asScala.toList
val inputDf0 = spark.read.json(spark.sparkContext.parallelize(records0,
parallelism)).cache
@@ -259,7 +260,7 @@ class TestSparkDataSource extends
SparkClientFunctionalTestHarness {
.mode(SaveMode.Overwrite)
.save(basePath)
- assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
+ assertTrue(HoodieDataSourceHelpers.hasNewCommits(storage, basePath, "000"))
// Snapshot query
val snapshotDf1 = spark.read.format("org.apache.hudi")
@@ -290,7 +291,7 @@ class TestSparkDataSource extends
SparkClientFunctionalTestHarness {
.mode(SaveMode.Append)
.save(basePath)
- assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, basePath,
"000").size())
+ assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(storage,
basePath, "000").size())
// Snapshot Query
val snapshotDf3 = spark.read.format("org.apache.hudi")
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestHoodieTableValuedFunction.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestHoodieTableValuedFunction.scala
index c693ed0a8802..7761b8ae3c9d 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestHoodieTableValuedFunction.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestHoodieTableValuedFunction.scala
@@ -20,9 +20,10 @@
package org.apache.spark.sql.hudi.dml.others
import org.apache.hudi.DataSourceWriteOptions.SPARK_SQL_INSERT_INTO_OPERATION
-import org.apache.hudi.hadoop.fs.HadoopFSUtils
+import org.apache.hudi.common.testutils.HoodieTestUtils
import
org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartitionStatsIndexKey
import org.apache.hudi.metadata.MetadataPartitionType
+import org.apache.hudi.storage.StoragePath
import org.apache.hudi.testutils.DataSourceTestUtils
import org.apache.spark.sql.functions.{col, from_json}
@@ -128,8 +129,8 @@ class TestHoodieTableValuedFunction extends
HoodieSparkSqlTestBase {
| """.stripMargin
)
- val fs = HadoopFSUtils.getFs(tablePath,
spark.sessionState.newHadoopConf())
- val firstCompletionTime =
DataSourceTestUtils.latestCommitCompletionTime(fs, tablePath)
+ val storage = HoodieTestUtils.getStorage(new StoragePath(tablePath))
+ val firstCompletionTime =
DataSourceTestUtils.latestCommitCompletionTime(storage, tablePath)
checkAnswer(
s"""select id,
@@ -150,7 +151,7 @@ class TestHoodieTableValuedFunction extends
HoodieSparkSqlTestBase {
| values (1, 'a1_1', 10, 1100), (2, 'a2_2', 20, 1100), (3,
'a3_3', 30, 1100)
| """.stripMargin
)
- val secondCompletionTime =
DataSourceTestUtils.latestCommitCompletionTime(fs, tablePath)
+ val secondCompletionTime =
DataSourceTestUtils.latestCommitCompletionTime(storage, tablePath)
checkAnswer(
s"""select id,
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala
index d83bdccea51c..0ededcb7ae92 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala
@@ -23,9 +23,10 @@ import org.apache.hudi.{DataSourceReadOptions,
DataSourceWriteOptions, ScalaAsse
import org.apache.hudi.DataSourceWriteOptions.SPARK_SQL_OPTIMIZED_WRITES
import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig}
import org.apache.hudi.common.table.timeline.HoodieTimeline
+import org.apache.hudi.common.testutils.HoodieTestUtils
import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig}
import
org.apache.hudi.config.HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT
-import org.apache.hudi.hadoop.fs.HadoopFSUtils
+import org.apache.hudi.storage.StoragePath
import org.apache.hudi.testutils.DataSourceTestUtils
import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
@@ -1031,8 +1032,8 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
checkAnswer(s"select id, name, price, _ts from $targetTable")(
Seq(1, "a1", 10, 1000)
)
- val fs = HadoopFSUtils.getFs(targetBasePath,
spark.sessionState.newHadoopConf())
- val firstCompletionTime =
DataSourceTestUtils.latestCommitCompletionTime(fs, targetBasePath)
+ val storage = HoodieTestUtils.getStorage(new
StoragePath(targetBasePath))
+ val firstCompletionTime =
DataSourceTestUtils.latestCommitCompletionTime(storage, targetBasePath)
// Second merge
spark.sql(s"update $sourceTable set price = 12, _ts = 1001 where id
= 1")
@@ -1046,7 +1047,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
checkAnswer(s"select id, name, price, _ts from $targetTable")(
Seq(1, "a1", 12, 1001)
)
- val secondCompletionTime =
DataSourceTestUtils.latestCommitCompletionTime(fs, targetBasePath)
+ val secondCompletionTime =
DataSourceTestUtils.latestCommitCompletionTime(storage, targetBasePath)
// Test incremental query
val hudiIncDF1 = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCopyToTableProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCopyToTableProcedure.scala
index abf0b98ac2c3..d170afd117b3 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCopyToTableProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCopyToTableProcedure.scala
@@ -17,7 +17,8 @@
package org.apache.spark.sql.hudi.procedure
-import org.apache.hudi.hadoop.fs.HadoopFSUtils
+import org.apache.hudi.common.testutils.HoodieTestUtils
+import org.apache.hudi.storage.StoragePath
import org.apache.hudi.testutils.DataSourceTestUtils
import org.apache.spark.sql.Row
@@ -192,11 +193,11 @@ class TestCopyToTableProcedure extends
HoodieSparkProcedureTestBase {
spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
// mark startCompletionTime
- val fs = HadoopFSUtils.getFs(tablePath,
spark.sessionState.newHadoopConf())
- val startCompletionTime =
DataSourceTestUtils.latestCommitCompletionTime(fs, tablePath)
+ val storage = HoodieTestUtils.getStorage(new StoragePath(tablePath))
+ val startCompletionTime =
DataSourceTestUtils.latestCommitCompletionTime(storage, tablePath)
spark.sql(s"insert into $tableName select 3, 'a3', 30, 2000")
spark.sql(s"insert into $tableName select 4, 'a4', 40, 2500")
- val endCompletionTime =
DataSourceTestUtils.latestCommitCompletionTime(fs, tablePath)
+ val endCompletionTime =
DataSourceTestUtils.latestCommitCompletionTime(storage, tablePath)
val copyTableName = generateTableName
// Check required fields