This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 192bedf2503a refactor(spark): Keep one latestCommitCompletionTime 
method in DataSourceTestUtils  (#17608)
192bedf2503a is described below

commit 192bedf2503a91db533e460c1ee67385caa8cb96
Author: Shawn Chang <[email protected]>
AuthorDate: Tue Dec 16 16:22:23 2025 -0800

    refactor(spark): Keep one latestCommitCompletionTime method in 
DataSourceTestUtils  (#17608)
---
 .../apache/hudi/testutils/DataSourceTestUtils.java |  5 ----
 .../hudi/functional/TestCOWDataSourceStorage.scala | 25 ++++++++++----------
 .../functional/TestDataSourceForBootstrap.scala    | 14 +++++++----
 .../hudi/functional/TestSparkDataSource.scala      | 27 +++++++++++-----------
 .../dml/others/TestHoodieTableValuedFunction.scala |  9 ++++----
 .../sql/hudi/dml/others/TestMergeIntoTable.scala   |  9 ++++----
 .../hudi/procedure/TestCopyToTableProcedure.scala  |  9 ++++----
 7 files changed, 51 insertions(+), 47 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
index f2c2c06aeef3..49e904b9c403 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
@@ -184,11 +184,6 @@ public class DataSourceTestUtils {
     return true;
   }
 
-  public static String latestCommitCompletionTime(FileSystem fs, String 
basePath) {
-    return HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
-        .getLatestCompletionTime().orElse(null);
-  }
-
   public static String latestCommitCompletionTime(HoodieStorage storage, 
String basePath) {
     return HoodieDataSourceHelpers.allCompletedCommitsCompactions(storage, 
basePath)
         .getLatestCompletionTime().orElse(null);
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
index 59edb399259a..158c98ebeac1 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
@@ -28,10 +28,11 @@ import org.apache.hudi.common.table.HoodieTableConfig
 import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
 import 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.recordsToStrings
+import org.apache.hudi.common.testutils.HoodieTestUtils
 import org.apache.hudi.config.{HoodiePreCommitValidatorConfig, 
HoodieWriteConfig}
 import org.apache.hudi.exception.{HoodieUpsertException, 
HoodieValidationException}
-import org.apache.hudi.hadoop.fs.HadoopFSUtils
 import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, 
TimestampBasedKeyGenerator}
+import org.apache.hudi.storage.StoragePath
 import org.apache.hudi.testutils.{DataSourceTestUtils, 
SparkClientFunctionalTestHarness}
 import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
 import 
org.apache.hudi.testutils.SparkClientFunctionalTestHarness.getSparkSqlConf
@@ -96,7 +97,7 @@ class TestCOWDataSourceStorage extends 
SparkClientFunctionalTestHarness {
       options += TIMESTAMP_OUTPUT_DATE_FORMAT.key -> "yyyyMMdd"
     }
     val dataGen = new HoodieTestDataGenerator(0xDEED)
-    val fs = HadoopFSUtils.getFs(basePath, 
spark.sparkContext.hadoopConfiguration)
+    val storage = HoodieTestUtils.getStorage(new StoragePath(basePath))
     // Insert Operation
     val records0 = recordsToStrings(dataGen.generateInserts("000", 
100)).asScala.toList
     val inputDF0 = spark.read.json(spark.sparkContext.parallelize(records0, 2))
@@ -106,8 +107,8 @@ class TestCOWDataSourceStorage extends 
SparkClientFunctionalTestHarness {
       .mode(SaveMode.Overwrite)
       .save(basePath)
 
-    val completionTime1 = DataSourceTestUtils.latestCommitCompletionTime(fs, 
basePath)
-    assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
+    val completionTime1 = 
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
+    assertTrue(HoodieDataSourceHelpers.hasNewCommits(storage, basePath, "000"))
 
     // Snapshot query
     val snapshotDF1 = spark.read.format("org.apache.hudi")
@@ -137,7 +138,7 @@ class TestCOWDataSourceStorage extends 
SparkClientFunctionalTestHarness {
       .mode(SaveMode.Append)
       .save(basePath)
 
-    val completionTime2 = DataSourceTestUtils.latestCommitCompletionTime(fs, 
basePath)
+    val completionTime2 = 
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
     val snapshotDF2 = spark.read.format("hudi")
       .options(readOptions)
       .load(basePath)
@@ -172,8 +173,8 @@ class TestCOWDataSourceStorage extends 
SparkClientFunctionalTestHarness {
       .mode(SaveMode.Append)
       .save(basePath)
 
-    val commitInstantTime3 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
-    assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, 
"000").size())
+    val commitInstantTime3 = HoodieDataSourceHelpers.latestCommit(storage, 
basePath)
+    assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(storage, 
basePath, "000").size())
 
     // Snapshot Query
     val snapshotDF3 = spark.read.format("org.apache.hudi")
@@ -183,7 +184,7 @@ class TestCOWDataSourceStorage extends 
SparkClientFunctionalTestHarness {
 
     // Read Incremental Query
     // we have 2 commits, try pulling the first commit (which is not the 
latest)
-    val firstCommit = HoodieDataSourceHelpers.listCommitsSince(fs, basePath, 
"000").get(0)
+    val firstCommit = HoodieDataSourceHelpers.listCommitsSince(storage, 
basePath, "000").get(0)
     // Setting HoodieROTablePathFilter here to test whether pathFilter can 
filter out correctly for IncrementalRelation
     
spark.sparkContext.hadoopConfiguration.set("mapreduce.input.pathFilter.class", 
"org.apache.hudi.hadoop.HoodieROTablePathFilter")
     val hoodieIncViewDF1 = spark.read.format("org.apache.hudi")
@@ -325,7 +326,7 @@ class TestCOWDataSourceStorage extends 
SparkClientFunctionalTestHarness {
     }
 
     val dataGen = new HoodieTestDataGenerator(0xDEED)
-    val fs = HadoopFSUtils.getFs(basePath, 
spark.sparkContext.hadoopConfiguration)
+    val storage = HoodieTestUtils.getStorage(new StoragePath(basePath))
     val records = recordsToStrings(dataGen.generateInserts("001", 
100)).asScala.toList
 
     // First commit, new partition, no existing table schema
@@ -337,7 +338,7 @@ class TestCOWDataSourceStorage extends 
SparkClientFunctionalTestHarness {
       .options(options)
       .mode(SaveMode.Overwrite)
       .save(basePath)
-    assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, 
"000").size())
+    assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(storage, 
basePath, "000").size())
 
     // Second commit, new partition, has existing table schema
     // Validation should succeed
@@ -352,7 +353,7 @@ class TestCOWDataSourceStorage extends 
SparkClientFunctionalTestHarness {
       .options(options)
       .mode(SaveMode.Append)
       .save(basePath)
-    assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, 
"000").size())
+    assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(storage, 
basePath, "000").size())
 
     // Third commit, new or existing partition, overwrite "driver" column to 
null for validation
     // Validation should succeed or fail, based on the query
@@ -370,7 +371,7 @@ class TestCOWDataSourceStorage extends 
SparkClientFunctionalTestHarness {
         .options(options)
         .mode(SaveMode.Append)
         .save(basePath)
-      assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, 
"000").size())
+      assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(storage, 
basePath, "000").size())
     } else {
       assertThrowsWithPreCommitValidator(new Executable() {
         override def execute(): Unit = {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
index 7b7345398699..0ce0ce2d1ee2 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
@@ -25,10 +25,12 @@ import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
 import org.apache.hudi.common.table.HoodieTableConfig
 import org.apache.hudi.common.table.timeline.HoodieTimeline
+import org.apache.hudi.common.testutils.HoodieTestUtils
 import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieClusteringConfig, 
HoodieCompactionConfig, HoodieWriteConfig}
 import org.apache.hudi.functional.TestDataSourceForBootstrap.{dropMetaCols, 
sort}
 import org.apache.hudi.hadoop.fs.HadoopFSUtils
 import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator}
+import org.apache.hudi.storage.{HoodieStorage, StoragePath}
 import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieClientTestUtils}
 
 import org.apache.hadoop.fs.{FileSystem, Path}
@@ -71,6 +73,7 @@ class TestDataSourceForBootstrap {
   var basePath: String = _
   var srcPath: String = _
   var fs: FileSystem = _
+  var storage: HoodieStorage = _
 
   val partitionPaths: List[String] = List("2020-04-01", "2020-04-02", 
"2020-04-03")
   val numRecords: Int = 100
@@ -93,6 +96,7 @@ class TestDataSourceForBootstrap {
     basePath = tempDir.toAbsolutePath.toString + "/base"
     srcPath = tempDir.toAbsolutePath.toString + "/src"
     fs = HadoopFSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration)
+    storage = HoodieTestUtils.getStorage(new StoragePath(basePath))
   }
 
   @AfterEach def tearDown(): Unit ={
@@ -130,7 +134,7 @@ class TestDataSourceForBootstrap {
       extraOpts = options ++ 
Map(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> bootstrapKeygenClass),
       bootstrapKeygenClass = bootstrapKeygenClass
     )
-    val commitCompletionTime1 = 
DataSourceTestUtils.latestCommitCompletionTime(fs, basePath)
+    val commitCompletionTime1 = 
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
     // check marked directory clean up
     assert(!fs.exists(new Path(basePath, ".hoodie/.temp/00000000000001")))
 
@@ -205,7 +209,7 @@ class TestDataSourceForBootstrap {
       readOpts ++ getRecordTypeOpts(recordType),
       classOf[SimpleKeyGenerator].getName)
 
-    val commitCompletionTime1 = 
DataSourceTestUtils.latestCommitCompletionTime(fs, basePath)
+    val commitCompletionTime1 = 
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
 
     // check marked directory clean up
     assert(!fs.exists(new Path(basePath, ".hoodie/.temp/00000000000001")))
@@ -281,7 +285,7 @@ class TestDataSourceForBootstrap {
       DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
       writeOpts,
       classOf[SimpleKeyGenerator].getName)
-    val commitCompletionTime1 = 
DataSourceTestUtils.latestCommitCompletionTime(fs, basePath)
+    val commitCompletionTime1 = 
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
 
     // Read bootstrapped table and verify count using glob path
     val hoodieROViewDF1 = spark.read.format("hudi").load(basePath)
@@ -582,8 +586,8 @@ class TestDataSourceForBootstrap {
       .mode(SaveMode.Overwrite)
       .save(basePath)
 
-    val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, 
basePath)
-    val commitInstantCompletionTime1: String = 
DataSourceTestUtils.latestCommitCompletionTime(fs, basePath)
+    val commitInstantTime1: String = 
HoodieDataSourceHelpers.latestCommit(storage, basePath)
+    val commitInstantCompletionTime1: String = 
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
     assertEquals(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS, commitInstantTime1)
 
     // Read bootstrapped table and verify count
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
index f5ee90920f5f..a5f92e55fa9f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
@@ -27,12 +27,13 @@ import org.apache.hudi.common.table.{HoodieTableConfig, 
HoodieTableMetaClient}
 import org.apache.hudi.common.table.read.CustomPayloadForTesting
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
 import 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.recordsToStrings
+import org.apache.hudi.common.testutils.HoodieTestUtils
 import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, 
HoodieWriteConfig}
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.functional.CommonOptionUtils.getWriterReaderOpts
-import org.apache.hudi.hadoop.fs.HadoopFSUtils
 import org.apache.hudi.index.HoodieIndex.IndexType
 import org.apache.hudi.keygen.NonpartitionedKeyGenerator
+import org.apache.hudi.storage.StoragePath
 import org.apache.hudi.testutils.{DataSourceTestUtils, 
SparkClientFunctionalTestHarness}
 import 
org.apache.hudi.testutils.SparkClientFunctionalTestHarness.getSparkSqlConf
 
@@ -86,7 +87,7 @@ class TestSparkDataSource extends 
SparkClientFunctionalTestHarness {
     // order of cols in inputDf and hudiDf differs slightly. so had to choose 
columns specifically to compare df directly.
     val colsToSelect = "_row_key, begin_lat,  begin_lon, city_to_state.LA, 
current_date, current_ts, distance_in_meters, driver, end_lat, end_lon, 
fare.amount, fare.currency, partition, partition_path, rider, timestamp, 
weight, _hoodie_is_deleted"
     val dataGen = new HoodieTestDataGenerator(0xDEED)
-    val fs = HadoopFSUtils.getFs(basePath, 
spark.sparkContext.hadoopConfiguration)
+    val storage = HoodieTestUtils.getStorage(new StoragePath(basePath))
     // Insert Operation
     val records0 = recordsToStrings(dataGen.generateInserts("000", 
10)).asScala.toList
     val inputDf0 = spark.read.json(spark.sparkContext.parallelize(records0, 
parallelism)).cache
@@ -95,8 +96,8 @@ class TestSparkDataSource extends 
SparkClientFunctionalTestHarness {
       .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
       .mode(SaveMode.Overwrite)
       .save(basePath)
-    val commitCompletionTime1 = 
DataSourceTestUtils.latestCommitCompletionTime(fs, basePath)
-    assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
+    val commitCompletionTime1 = 
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
+    assertTrue(HoodieDataSourceHelpers.hasNewCommits(storage, basePath, "000"))
 
     // Snapshot query
     val snapshotDf1 = spark.read.format("org.apache.hudi")
@@ -113,8 +114,8 @@ class TestSparkDataSource extends 
SparkClientFunctionalTestHarness {
       .options(options)
       .mode(SaveMode.Append)
       .save(basePath)
-    val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
-    val commitCompletionTime2 = 
DataSourceTestUtils.latestCommitCompletionTime(fs, basePath)
+    val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(storage, 
basePath)
+    val commitCompletionTime2 = 
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
 
     val snapshotDf2 = spark.read.format("org.apache.hudi")
       .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabledOnRead)
@@ -132,9 +133,9 @@ class TestSparkDataSource extends 
SparkClientFunctionalTestHarness {
       .mode(SaveMode.Append)
       .save(basePath)
 
-    val commitInstantTime3 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
-    val commitCompletionTime3 = 
DataSourceTestUtils.latestCommitCompletionTime(fs, basePath)
-    assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, 
"000").size())
+    val commitInstantTime3 = HoodieDataSourceHelpers.latestCommit(storage, 
basePath)
+    val commitCompletionTime3 = 
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
+    assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(storage, 
basePath, "000").size())
 
     // Snapshot Query
     val snapshotDf3 = spark.read.format("org.apache.hudi")
@@ -146,7 +147,7 @@ class TestSparkDataSource extends 
SparkClientFunctionalTestHarness {
 
     // Read Incremental Query
     // we have 2 commits, try pulling the first commit (which is not the 
latest)
-    val firstCommit = HoodieDataSourceHelpers.listCommitsSince(fs, basePath, 
"000").get(0)
+    val firstCommit = HoodieDataSourceHelpers.listCommitsSince(storage, 
basePath, "000").get(0)
     val hoodieIncViewDf1 = spark.read.format("org.apache.hudi")
       .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
       .option(DataSourceReadOptions.START_COMMIT.key, "000")
@@ -249,7 +250,7 @@ class TestSparkDataSource extends 
SparkClientFunctionalTestHarness {
     // order of cols in inputDf and hudiDf differs slightly. so had to choose 
columns specifically to compare df directly.
     val colsToSelect = "_row_key, begin_lat,  begin_lon, city_to_state.LA, 
current_date, current_ts, distance_in_meters, driver, end_lat, end_lon, 
fare.amount, fare.currency, partition, partition_path, rider, timestamp, 
weight, _hoodie_is_deleted"
     val dataGen = new HoodieTestDataGenerator(0xDEED)
-    val fs = HadoopFSUtils.getFs(basePath, 
spark.sparkContext.hadoopConfiguration)
+    val storage = HoodieTestUtils.getStorage(new StoragePath(basePath))
     // Insert Operation
     val records0 = recordsToStrings(dataGen.generateInserts("000", 
10)).asScala.toList
     val inputDf0 = spark.read.json(spark.sparkContext.parallelize(records0, 
parallelism)).cache
@@ -259,7 +260,7 @@ class TestSparkDataSource extends 
SparkClientFunctionalTestHarness {
       .mode(SaveMode.Overwrite)
       .save(basePath)
 
-    assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
+    assertTrue(HoodieDataSourceHelpers.hasNewCommits(storage, basePath, "000"))
 
     // Snapshot query
     val snapshotDf1 = spark.read.format("org.apache.hudi")
@@ -290,7 +291,7 @@ class TestSparkDataSource extends 
SparkClientFunctionalTestHarness {
       .mode(SaveMode.Append)
       .save(basePath)
 
-    assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, 
"000").size())
+    assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(storage, 
basePath, "000").size())
 
     // Snapshot Query
     val snapshotDf3 = spark.read.format("org.apache.hudi")
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestHoodieTableValuedFunction.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestHoodieTableValuedFunction.scala
index c693ed0a8802..7761b8ae3c9d 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestHoodieTableValuedFunction.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestHoodieTableValuedFunction.scala
@@ -20,9 +20,10 @@
 package org.apache.spark.sql.hudi.dml.others
 
 import org.apache.hudi.DataSourceWriteOptions.SPARK_SQL_INSERT_INTO_OPERATION
-import org.apache.hudi.hadoop.fs.HadoopFSUtils
+import org.apache.hudi.common.testutils.HoodieTestUtils
 import 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartitionStatsIndexKey
 import org.apache.hudi.metadata.MetadataPartitionType
+import org.apache.hudi.storage.StoragePath
 import org.apache.hudi.testutils.DataSourceTestUtils
 
 import org.apache.spark.sql.functions.{col, from_json}
@@ -128,8 +129,8 @@ class TestHoodieTableValuedFunction extends 
HoodieSparkSqlTestBase {
                | """.stripMargin
           )
 
-          val fs = HadoopFSUtils.getFs(tablePath, 
spark.sessionState.newHadoopConf())
-          val firstCompletionTime = 
DataSourceTestUtils.latestCommitCompletionTime(fs, tablePath)
+          val storage = HoodieTestUtils.getStorage(new StoragePath(tablePath))
+          val firstCompletionTime = 
DataSourceTestUtils.latestCommitCompletionTime(storage, tablePath)
 
           checkAnswer(
             s"""select id,
@@ -150,7 +151,7 @@ class TestHoodieTableValuedFunction extends 
HoodieSparkSqlTestBase {
                | values (1, 'a1_1', 10, 1100), (2, 'a2_2', 20, 1100), (3, 
'a3_3', 30, 1100)
                | """.stripMargin
           )
-          val secondCompletionTime = 
DataSourceTestUtils.latestCommitCompletionTime(fs, tablePath)
+          val secondCompletionTime = 
DataSourceTestUtils.latestCommitCompletionTime(storage, tablePath)
 
           checkAnswer(
             s"""select id,
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala
index d83bdccea51c..0ededcb7ae92 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala
@@ -23,9 +23,10 @@ import org.apache.hudi.{DataSourceReadOptions, 
DataSourceWriteOptions, ScalaAsse
 import org.apache.hudi.DataSourceWriteOptions.SPARK_SQL_OPTIMIZED_WRITES
 import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig}
 import org.apache.hudi.common.table.timeline.HoodieTimeline
+import org.apache.hudi.common.testutils.HoodieTestUtils
 import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig}
 import 
org.apache.hudi.config.HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT
-import org.apache.hudi.hadoop.fs.HadoopFSUtils
+import org.apache.hudi.storage.StoragePath
 import org.apache.hudi.testutils.DataSourceTestUtils
 import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
 
@@ -1031,8 +1032,8 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
           checkAnswer(s"select id, name, price, _ts from $targetTable")(
             Seq(1, "a1", 10, 1000)
           )
-          val fs = HadoopFSUtils.getFs(targetBasePath, 
spark.sessionState.newHadoopConf())
-          val firstCompletionTime = 
DataSourceTestUtils.latestCommitCompletionTime(fs, targetBasePath)
+          val storage = HoodieTestUtils.getStorage(new 
StoragePath(targetBasePath))
+          val firstCompletionTime = 
DataSourceTestUtils.latestCommitCompletionTime(storage, targetBasePath)
 
           // Second merge
           spark.sql(s"update $sourceTable set price = 12, _ts = 1001 where id 
= 1")
@@ -1046,7 +1047,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
           checkAnswer(s"select id, name, price, _ts from $targetTable")(
             Seq(1, "a1", 12, 1001)
           )
-          val secondCompletionTime = 
DataSourceTestUtils.latestCommitCompletionTime(fs, targetBasePath)
+          val secondCompletionTime = 
DataSourceTestUtils.latestCommitCompletionTime(storage, targetBasePath)
           // Test incremental query
           val hudiIncDF1 = spark.read.format("org.apache.hudi")
             .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCopyToTableProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCopyToTableProcedure.scala
index abf0b98ac2c3..d170afd117b3 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCopyToTableProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCopyToTableProcedure.scala
@@ -17,7 +17,8 @@
 
 package org.apache.spark.sql.hudi.procedure
 
-import org.apache.hudi.hadoop.fs.HadoopFSUtils
+import org.apache.hudi.common.testutils.HoodieTestUtils
+import org.apache.hudi.storage.StoragePath
 import org.apache.hudi.testutils.DataSourceTestUtils
 
 import org.apache.spark.sql.Row
@@ -192,11 +193,11 @@ class TestCopyToTableProcedure extends 
HoodieSparkProcedureTestBase {
       spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
 
       // mark startCompletionTime
-      val fs = HadoopFSUtils.getFs(tablePath, 
spark.sessionState.newHadoopConf())
-      val startCompletionTime = 
DataSourceTestUtils.latestCommitCompletionTime(fs, tablePath)
+      val storage = HoodieTestUtils.getStorage(new StoragePath(tablePath))
+      val startCompletionTime = 
DataSourceTestUtils.latestCommitCompletionTime(storage, tablePath)
       spark.sql(s"insert into $tableName select 3, 'a3', 30, 2000")
       spark.sql(s"insert into $tableName select 4, 'a4', 40, 2500")
-      val endCompletionTime = 
DataSourceTestUtils.latestCommitCompletionTime(fs, tablePath)
+      val endCompletionTime = 
DataSourceTestUtils.latestCommitCompletionTime(storage, tablePath)
 
       val copyTableName = generateTableName
       // Check required fields

Reply via email to