This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.15.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 72dd5183ba54e2885792d481f30db35166bef218
Author: Y Ethan Guo <[email protected]>
AuthorDate: Sat May 25 00:18:00 2024 -0700

    [HUDI-7790] Revert changes in DFSPathSelector and UtilHelpers.readConfig 
(#11294)
---
 .../org/apache/hudi/cli/commands/SparkMain.java    |  3 +-
 .../hudi/integ/testsuite/HoodieTestSuiteJob.java   |  3 +-
 .../SparkDataSourceContinuousIngestTool.java       |  3 +-
 .../helpers/DFSTestSuitePathSelector.java          | 41 ++++++++--------
 .../apache/hudi/utilities/HDFSParquetImporter.java |  3 +-
 .../org/apache/hudi/utilities/HoodieCleaner.java   |  3 +-
 .../apache/hudi/utilities/HoodieClusteringJob.java |  4 +-
 .../org/apache/hudi/utilities/HoodieCompactor.java |  4 +-
 .../hudi/utilities/HoodieDataTableValidator.java   |  3 +-
 .../hudi/utilities/HoodieDropPartitionsTool.java   |  4 +-
 .../org/apache/hudi/utilities/HoodieIndexer.java   |  4 +-
 .../utilities/HoodieMetadataTableValidator.java    |  2 +-
 .../apache/hudi/utilities/HoodieRepairTool.java    |  2 +-
 .../org/apache/hudi/utilities/TableSizeStats.java  |  3 +-
 .../org/apache/hudi/utilities/UtilHelpers.java     |  8 ++--
 .../utilities/sources/helpers/DFSPathSelector.java | 54 ++++++++++------------
 .../sources/helpers/DatePartitionPathSelector.java | 46 +++++++-----------
 .../streamer/HoodieMultiTableStreamer.java         |  5 +-
 .../hudi/utilities/streamer/HoodieStreamer.java    |  5 +-
 .../helpers/TestDFSPathSelectorCommonMethods.java  | 19 ++++----
 .../helpers/TestDatePartitionPathSelector.java     |  5 +-
 21 files changed, 105 insertions(+), 119 deletions(-)

diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
index f8106ffc55c..fe13813490d 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
@@ -44,7 +44,6 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.keygen.constant.KeyGeneratorType;
 import org.apache.hudi.storage.HoodieStorageUtils;
-import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieSparkTable;
 import 
org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy;
 import org.apache.hudi.table.marker.WriteMarkersFactory;
@@ -483,7 +482,7 @@ public class SparkMain {
                                  String payloadClassName, String 
enableHiveSync, String propsFilePath, List<String> configs) throws IOException {
 
     TypedProperties properties = propsFilePath == null ? 
buildProperties(configs)
-        : readConfig(jsc.hadoopConfiguration(), new 
StoragePath(propsFilePath), configs).getProps(true);
+        : readConfig(jsc.hadoopConfiguration(), new Path(propsFilePath), 
configs).getProps(true);
 
     properties.setProperty(HoodieBootstrapConfig.BASE_PATH.key(), sourcePath);
 
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
index 8813129d748..70910357d7d 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
@@ -44,7 +44,6 @@ import org.apache.hudi.integ.testsuite.reader.DeltaInputType;
 import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode;
 import org.apache.hudi.keygen.BuiltinKeyGenerator;
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
-import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.utilities.UtilHelpers;
 import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
 
@@ -115,7 +114,7 @@ public class HoodieTestSuiteJob {
         
SparkSession.builder().config(jsc.getConf()).enableHiveSupport().getOrCreate();
     this.fs = HadoopFSUtils.getFs(cfg.inputBasePath, 
jsc.hadoopConfiguration());
     this.props =
-        UtilHelpers.readConfig(fs.getConf(), new 
StoragePath(cfg.propsFilePath), cfg.configs).getProps();
+        UtilHelpers.readConfig(fs.getConf(), new Path(cfg.propsFilePath), 
cfg.configs).getProps();
     log.info("Creating workload generator with configs : {}", 
props.toString());
     this.hiveConf = getDefaultHiveConf(jsc.hadoopConfiguration());
     this.keyGenerator =
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngestTool.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngestTool.java
index cbb2a27e54f..81bc4435623 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngestTool.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngestTool.java
@@ -22,7 +22,6 @@ package org.apache.hudi.integ.testsuite;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
-import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.utilities.HoodieRepairTool;
 import org.apache.hudi.utilities.IdentitySplitter;
 import org.apache.hudi.utilities.UtilHelpers;
@@ -133,7 +132,7 @@ public class SparkDataSourceContinuousIngestTool {
    * @return the {@link TypedProperties} instance.
    */
   private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, 
Config cfg) {
-    return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new 
StoragePath(cfg.propsFilePath), cfg.configs)
+    return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new 
Path(cfg.propsFilePath), cfg.configs)
         .getProps(true);
   }
 
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java
index e2a2c19f666..70026aa5f7f 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java
@@ -24,17 +24,20 @@ import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob;
-import org.apache.hudi.storage.StoragePathInfo;
-import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.utilities.config.DFSPathSelectorConfig;
 import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -67,31 +70,31 @@ public class DFSTestSuitePathSelector extends 
DFSPathSelector {
       }
 
       // obtain all eligible files for the batch
-      List<StoragePathInfo> eligibleFiles = new ArrayList<>();
-      List<StoragePathInfo> pathInfoList = storage.globEntries(
-          new StoragePath(getStringWithAltKeys(props, 
DFSPathSelectorConfig.ROOT_INPUT_PATH),
-              "*"));
+      List<FileStatus> eligibleFiles = new ArrayList<>();
+      FileStatus[] fileStatuses = fs.globStatus(
+          new Path(getStringWithAltKeys(props, 
DFSPathSelectorConfig.ROOT_INPUT_PATH), "*"));
       // Say input data is as follow input/1, input/2, input/5 since 3,4 was 
rolled back and 5 is new generated data
       // checkpoint from the latest commit metadata will be 2 since 3,4 has 
been rolled back. We need to set the
       // next batch id correctly as 5 instead of 3
-      Option<String> correctBatchIdDueToRollback = 
Option.fromJavaOptional(pathInfoList.stream()
-          .map(f -> f.getPath().toString().split("/")[
-              f.getPath().toString().split("/").length - 1])
+      Option<String> correctBatchIdDueToRollback = 
Option.fromJavaOptional(Arrays.stream(fileStatuses)
+          .map(f -> 
f.getPath().toString().split("/")[f.getPath().toString().split("/").length - 1])
           .filter(bid1 -> Integer.parseInt(bid1) > lastBatchId)
           .min((bid1, bid2) -> Integer.min(Integer.parseInt(bid1), 
Integer.parseInt(bid2))));
-      if (correctBatchIdDueToRollback.isPresent()
-          && Integer.parseInt(correctBatchIdDueToRollback.get()) > 
nextBatchId) {
+      if (correctBatchIdDueToRollback.isPresent() && 
Integer.parseInt(correctBatchIdDueToRollback.get()) > nextBatchId) {
         nextBatchId = Integer.parseInt(correctBatchIdDueToRollback.get());
       }
-      log.info("Using DFSTestSuitePathSelector, checkpoint: " + 
lastCheckpointStr + " sourceLimit: "
-          + sourceLimit + " lastBatchId: " + lastBatchId + " nextBatchId: " + 
nextBatchId);
-      for (StoragePathInfo pathInfo : pathInfoList) {
-        if (!pathInfo.isDirectory() || IGNORE_FILEPREFIX_LIST.stream()
-            .anyMatch(pfx -> pathInfo.getPath().getName().startsWith(pfx))) {
+      log.info("Using DFSTestSuitePathSelector, checkpoint: " + 
lastCheckpointStr + " sourceLimit: " + sourceLimit
+          + " lastBatchId: " + lastBatchId + " nextBatchId: " + nextBatchId);
+      for (FileStatus fileStatus : fileStatuses) {
+        if (!fileStatus.isDirectory() || IGNORE_FILEPREFIX_LIST.stream()
+            .anyMatch(pfx -> fileStatus.getPath().getName().startsWith(pfx))) {
           continue;
-        } else if (Integer.parseInt(pathInfo.getPath().getName()) > lastBatchId
-            && Integer.parseInt(pathInfo.getPath().getName()) <= nextBatchId) {
-          eligibleFiles.addAll(storage.listFiles(pathInfo.getPath()));
+        } else if (Integer.parseInt(fileStatus.getPath().getName()) > 
lastBatchId && Integer.parseInt(fileStatus.getPath()
+            .getName()) <= nextBatchId) {
+          RemoteIterator<LocatedFileStatus> files = 
fs.listFiles(fileStatus.getPath(), true);
+          while (files.hasNext()) {
+            eligibleFiles.add(files.next());
+          }
         }
       }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
index 3513f7c6760..1dc24fd31b8 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
@@ -32,7 +32,6 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
-import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.utilities.streamer.HoodieStreamer;
 
 import com.beust.jcommander.IValueValidator;
@@ -114,7 +113,7 @@ public class HDFSParquetImporter implements Serializable {
   public int dataImport(JavaSparkContext jsc, int retry) {
     this.fs = HadoopFSUtils.getFs(cfg.targetPath, jsc.hadoopConfiguration());
     this.props = cfg.propsFilePath == null ? 
UtilHelpers.buildProperties(cfg.configs)
-        : UtilHelpers.readConfig(fs.getConf(), new 
StoragePath(cfg.propsFilePath), cfg.configs).getProps(true);
+        : UtilHelpers.readConfig(fs.getConf(), new Path(cfg.propsFilePath), 
cfg.configs).getProps(true);
     LOG.info("Starting data import with configs : " + props.toString());
     int ret = -1;
     try {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java
index e1d6a13cb9a..83f535191b9 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java
@@ -23,7 +23,6 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.storage.StoragePath;
 
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
@@ -62,7 +61,7 @@ public class HoodieCleaner {
      * Filesystem used.
      */
     this.props = cfg.propsFilePath == null ? 
UtilHelpers.buildProperties(cfg.configs)
-        : UtilHelpers.readConfig(jssc.hadoopConfiguration(), new 
StoragePath(cfg.propsFilePath), cfg.configs).getProps(true);
+        : UtilHelpers.readConfig(jssc.hadoopConfiguration(), new 
Path(cfg.propsFilePath), cfg.configs).getProps(true);
     LOG.info("Creating Cleaner with configs : " + props.toString());
   }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
index b96b4610376..90c7d493705 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
@@ -29,11 +29,11 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieSparkTable;
 
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
+import org.apache.hadoop.fs.Path;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.jetbrains.annotations.TestOnly;
 import org.slf4j.Logger;
@@ -73,7 +73,7 @@ public class HoodieClusteringJob {
   }
 
   private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, 
Config cfg) {
-    return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new 
StoragePath(cfg.propsFilePath), cfg.configs)
+    return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new 
Path(cfg.propsFilePath), cfg.configs)
         .getProps(true);
   }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
index 90c66add046..82acce6a4eb 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
@@ -30,7 +30,6 @@ import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
-import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import 
org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy;
 
@@ -38,6 +37,7 @@ import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import org.apache.avro.Schema;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.slf4j.Logger;
@@ -76,7 +76,7 @@ public class HoodieCompactor {
   }
 
   private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, 
Config cfg) {
-    return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new 
StoragePath(cfg.propsFilePath), cfg.configs)
+    return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new 
Path(cfg.propsFilePath), cfg.configs)
         .getProps(true);
   }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDataTableValidator.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDataTableValidator.java
index 459483e547c..9953b5225a3 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDataTableValidator.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDataTableValidator.java
@@ -39,6 +39,7 @@ import org.apache.hudi.table.repair.RepairUtils;
 
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
+import org.apache.hadoop.fs.Path;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.slf4j.Logger;
@@ -139,7 +140,7 @@ public class HoodieDataTableValidator implements 
Serializable {
    * @return the {@link TypedProperties} instance.
    */
   private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, 
Config cfg) {
-    return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new 
StoragePath(cfg.propsFilePath), cfg.configs)
+    return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new 
Path(cfg.propsFilePath), cfg.configs)
         .getProps(true);
   }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java
index 17210d25639..05a5742e841 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java
@@ -34,7 +34,6 @@ import org.apache.hudi.hive.HiveSyncConfig;
 import org.apache.hudi.hive.HiveSyncConfigHolder;
 import org.apache.hudi.hive.HiveSyncTool;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
-import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.sync.common.HoodieSyncConfig;
 import org.apache.hudi.table.HoodieSparkTable;
 
@@ -42,6 +41,7 @@ import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -136,7 +136,7 @@ public class HoodieDropPartitionsTool implements 
Serializable {
    * @return the {@link TypedProperties} instance.
    */
   private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, 
Config cfg) {
-    return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new 
StoragePath(cfg.propsFilePath), cfg.configs)
+    return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new 
Path(cfg.propsFilePath), cfg.configs)
         .getProps(true);
   }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java
index 13d168a24c0..5c626a53ae7 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java
@@ -31,10 +31,10 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIndexException;
 import org.apache.hudi.metadata.MetadataPartitionType;
-import org.apache.hudi.storage.StoragePath;
 
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
+import org.apache.hadoop.fs.Path;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.jetbrains.annotations.TestOnly;
 import org.slf4j.Logger;
@@ -105,7 +105,7 @@ public class HoodieIndexer {
   }
 
   private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, 
HoodieIndexer.Config cfg) {
-    return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new 
StoragePath(cfg.propsFilePath), cfg.configs)
+    return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new 
Path(cfg.propsFilePath), cfg.configs)
         .getProps(true);
   }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index a9bade03137..bfb9e18af1b 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -258,7 +258,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
    * @return the {@link TypedProperties} instance.
    */
   private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, 
Config cfg) {
-    return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new 
StoragePath(cfg.propsFilePath), cfg.configs)
+    return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new 
Path(cfg.propsFilePath), cfg.configs)
         .getProps(true);
   }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java
index b2bb34ede3b..237e0cb2263 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java
@@ -518,7 +518,7 @@ public class HoodieRepairTool {
    * @return the {@link TypedProperties} instance.
    */
   private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, 
Config cfg) {
-    return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new 
StoragePath(cfg.propsFilePath), cfg.configs)
+    return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new 
Path(cfg.propsFilePath), cfg.configs)
         .getProps(true);
   }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/TableSizeStats.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/TableSizeStats.java
index ff655dfd017..a9b0f70bca9 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/TableSizeStats.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/TableSizeStats.java
@@ -34,7 +34,6 @@ import org.apache.hudi.exception.TableNotFoundException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.storage.StorageConfiguration;
-import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
 
 import com.beust.jcommander.JCommander;
@@ -132,7 +131,7 @@ public class TableSizeStats implements Serializable {
    * @return the {@link TypedProperties} instance.
    */
   private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, 
Config cfg) {
-    return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new 
StoragePath(cfg.propsFilePath), cfg.configs)
+    return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new 
Path(cfg.propsFilePath), cfg.configs)
         .getProps(true);
   }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index abf0558e5ff..74cc775718a 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -118,6 +118,7 @@ import java.util.function.Supplier;
 
 import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
 import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
 
 /**
  * Bunch of helper methods.
@@ -242,13 +243,14 @@ public class UtilHelpers {
   }
 
   public static DFSPropertiesConfiguration readConfig(Configuration 
hadoopConfig,
-                                                      StoragePath cfgPath,
+                                                      Path cfgPath,
                                                       List<String> 
overriddenProps) {
-    DFSPropertiesConfiguration conf = new 
DFSPropertiesConfiguration(hadoopConfig, cfgPath);
+    StoragePath storagePath = convertToStoragePath(cfgPath);
+    DFSPropertiesConfiguration conf = new 
DFSPropertiesConfiguration(hadoopConfig, storagePath);
     try {
       if (!overriddenProps.isEmpty()) {
         LOG.info("Adding overridden properties to file properties.");
-        conf.addPropsFromStream(new BufferedReader(new 
StringReader(String.join("\n", overriddenProps))), cfgPath);
+        conf.addPropsFromStream(new BufferedReader(new 
StringReader(String.join("\n", overriddenProps))), storagePath);
       }
     } catch (IOException ioe) {
       throw new HoodieIOException("Unexpected error adding config overrides", 
ioe);
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
index 62f182df359..257c015c53b 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
@@ -26,13 +26,12 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
-import org.apache.hudi.storage.HoodieStorage;
-import org.apache.hudi.storage.HoodieStorageUtils;
-import org.apache.hudi.storage.StoragePath;
-import org.apache.hudi.storage.StoragePathInfo;
 import org.apache.hudi.utilities.config.DFSPathSelectorConfig;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,15 +65,15 @@ public class DFSPathSelector implements Serializable {
 
   protected static final List<String> IGNORE_FILEPREFIX_LIST = 
Arrays.asList(".", "_");
 
-  protected final transient HoodieStorage storage;
+  protected final transient FileSystem fs;
   protected final TypedProperties props;
 
   public DFSPathSelector(TypedProperties props, Configuration hadoopConf) {
     checkRequiredConfigProperties(
         props, 
Collections.singletonList(DFSPathSelectorConfig.ROOT_INPUT_PATH));
     this.props = props;
-    this.storage = HoodieStorageUtils.getStorage(
-        getStringWithAltKeys(props, DFSPathSelectorConfig.ROOT_INPUT_PATH), 
HadoopFSUtils.getStorageConf(hadoopConf));
+    this.fs = HadoopFSUtils.getFs(
+        getStringWithAltKeys(props, DFSPathSelectorConfig.ROOT_INPUT_PATH), 
hadoopConf);
   }
 
   /**
@@ -125,19 +124,16 @@ public class DFSPathSelector implements Serializable {
       log.info("Root path => " + getStringWithAltKeys(props, 
DFSPathSelectorConfig.ROOT_INPUT_PATH)
           + " source limit => " + sourceLimit);
       long lastCheckpointTime = 
lastCheckpointStr.map(Long::parseLong).orElse(Long.MIN_VALUE);
-      List<StoragePathInfo> eligibleFiles = listEligibleFiles(
-          storage, new StoragePath(getStringWithAltKeys(props,
-              DFSPathSelectorConfig.ROOT_INPUT_PATH)),
-          lastCheckpointTime);
+      List<FileStatus> eligibleFiles = listEligibleFiles(
+          fs, new Path(getStringWithAltKeys(props, 
DFSPathSelectorConfig.ROOT_INPUT_PATH)), lastCheckpointTime);
       // sort them by modification time.
-      
eligibleFiles.sort(Comparator.comparingLong(StoragePathInfo::getModificationTime));
+      
eligibleFiles.sort(Comparator.comparingLong(FileStatus::getModificationTime));
       // Filter based on checkpoint & input size, if needed
       long currentBytes = 0;
       long newCheckpointTime = lastCheckpointTime;
-      List<StoragePathInfo> filteredFiles = new ArrayList<>();
-      for (StoragePathInfo f : eligibleFiles) {
-        if (currentBytes + f.getLength() >= sourceLimit
-            && f.getModificationTime() > newCheckpointTime) {
+      List<FileStatus> filteredFiles = new ArrayList<>();
+      for (FileStatus f : eligibleFiles) {
+        if (currentBytes + f.getLen() >= sourceLimit && 
f.getModificationTime() > newCheckpointTime) {
           // we have enough data, we are done
           // Also, we've read up to a file with a newer modification time
           // so that some files with the same modification time won't be 
skipped in next read
@@ -145,7 +141,7 @@ public class DFSPathSelector implements Serializable {
         }
 
         newCheckpointTime = f.getModificationTime();
-        currentBytes += f.getLength();
+        currentBytes += f.getLen();
         filteredFiles.add(f);
       }
 
@@ -155,9 +151,7 @@ public class DFSPathSelector implements Serializable {
       }
 
       // read the files out.
-      String pathStr =
-          filteredFiles.stream().map(f -> f.getPath().toString())
-              .collect(Collectors.joining(","));
+      String pathStr = filteredFiles.stream().map(f -> 
f.getPath().toString()).collect(Collectors.joining(","));
 
       return new ImmutablePair<>(Option.ofNullable(pathStr), 
String.valueOf(newCheckpointTime));
     } catch (IOException ioe) {
@@ -168,17 +162,19 @@ public class DFSPathSelector implements Serializable {
   /**
    * List files recursively, filter out illegible files/directories while 
doing so.
    */
-  protected List<StoragePathInfo> listEligibleFiles(HoodieStorage storage, 
StoragePath path,
-                                                    long lastCheckpointTime) 
throws IOException {
+  protected List<FileStatus> listEligibleFiles(FileSystem fs, Path path, long 
lastCheckpointTime) throws IOException {
     // skip files/dirs whose names start with (_, ., etc)
-    List<StoragePathInfo> pathInfoList = storage.listDirectEntries(path, file 
->
+    FileStatus[] statuses = fs.listStatus(path, file ->
         IGNORE_FILEPREFIX_LIST.stream().noneMatch(pfx -> 
file.getName().startsWith(pfx)));
-    List<StoragePathInfo> res = new ArrayList<>();
-    for (StoragePathInfo pathInfo : pathInfoList) {
-      if (pathInfo.isDirectory()) {
-        res.addAll(listEligibleFiles(storage, pathInfo.getPath(), 
lastCheckpointTime));
-      } else if (pathInfo.getModificationTime() > lastCheckpointTime && 
pathInfo.getLength() > 0) {
-        res.add(pathInfo);
+    List<FileStatus> res = new ArrayList<>();
+    for (FileStatus status : statuses) {
+      if (status.isDirectory()) {
+        // avoid infinite loop
+        if (!status.isSymlink()) {
+          res.addAll(listEligibleFiles(fs, status.getPath(), 
lastCheckpointTime));
+        }
+      } else if (status.getModificationTime() > lastCheckpointTime && 
status.getLen() > 0) {
+        res.add(status);
       }
     }
     return res;
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
index ab9ccbb8ca7..70acd7ca527 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
@@ -24,12 +24,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
-import org.apache.hudi.storage.HoodieStorage;
-import org.apache.hudi.storage.HoodieStorageUtils;
-import org.apache.hudi.storage.StorageConfiguration;
-import org.apache.hudi.storage.StoragePath;
-import org.apache.hudi.storage.StoragePathInfo;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 import org.apache.hudi.utilities.config.DatePartitionPathSelectorConfig;
 
 import org.apache.hadoop.conf.Configuration;
@@ -136,28 +131,25 @@ public class DatePartitionPathSelector extends 
DFSPathSelector {
             + currentDate);
     long lastCheckpointTime = 
lastCheckpointStr.map(Long::parseLong).orElse(Long.MIN_VALUE);
     HoodieSparkEngineContext context = new 
HoodieSparkEngineContext(sparkContext);
-    StorageConfiguration<?> storageConf = storage.getConf();
+    HadoopStorageConfiguration storageConf = new 
HadoopStorageConfiguration(fs.getConf());
     List<String> prunedPartitionPaths = pruneDatePartitionPaths(
-        context, storage, getStringWithAltKeys(props, ROOT_INPUT_PATH),
-        currentDate);
+        context, fs, getStringWithAltKeys(props, ROOT_INPUT_PATH), 
currentDate);
 
-    List<StoragePathInfo> eligibleFiles = context.flatMap(prunedPartitionPaths,
+    List<FileStatus> eligibleFiles = context.flatMap(prunedPartitionPaths,
         path -> {
-          HoodieStorage storage = HoodieStorageUtils.getStorage(path, 
storageConf);
-          return listEligibleFiles(storage, new StoragePath(path), 
lastCheckpointTime).stream();
+          FileSystem fs = new Path(path).getFileSystem(storageConf.unwrap());
+          return listEligibleFiles(fs, new Path(path), 
lastCheckpointTime).stream();
         }, partitionsListParallelism);
     // sort them by modification time ascending.
-    List<StoragePathInfo> sortedEligibleFiles = eligibleFiles.stream()
-        .sorted(Comparator.comparingLong(StoragePathInfo::getModificationTime))
-        .collect(Collectors.toList());
+    List<FileStatus> sortedEligibleFiles = eligibleFiles.stream()
+        
.sorted(Comparator.comparingLong(FileStatus::getModificationTime)).collect(Collectors.toList());
 
     // Filter based on checkpoint & input size, if needed
     long currentBytes = 0;
     long newCheckpointTime = lastCheckpointTime;
-    List<StoragePathInfo> filteredFiles = new ArrayList<>();
-    for (StoragePathInfo f : sortedEligibleFiles) {
-      if (currentBytes + f.getLength() >= sourceLimit
-          && f.getModificationTime() > newCheckpointTime) {
+    List<FileStatus> filteredFiles = new ArrayList<>();
+    for (FileStatus f : sortedEligibleFiles) {
+      if (currentBytes + f.getLen() >= sourceLimit && f.getModificationTime() 
> newCheckpointTime) {
         // we have enough data, we are done
         // Also, we've read up to a file with a newer modification time
         // so that some files with the same modification time won't be skipped 
in next read
@@ -165,7 +157,7 @@ public class DatePartitionPathSelector extends 
DFSPathSelector {
       }
 
       newCheckpointTime = f.getModificationTime();
-      currentBytes += f.getLength();
+      currentBytes += f.getLen();
       filteredFiles.add(f);
     }
 
@@ -175,9 +167,7 @@ public class DatePartitionPathSelector extends 
DFSPathSelector {
     }
 
     // read the files out.
-    String pathStr =
-        filteredFiles.stream().map(f -> f.getPath().toString())
-            .collect(Collectors.joining(","));
+    String pathStr = filteredFiles.stream().map(f -> 
f.getPath().toString()).collect(Collectors.joining(","));
 
     return new ImmutablePair<>(Option.ofNullable(pathStr), 
String.valueOf(newCheckpointTime));
   }
@@ -186,25 +176,21 @@ public class DatePartitionPathSelector extends 
DFSPathSelector {
    * Prunes date level partitions to last few days configured by 
'NUM_PREV_DAYS_TO_LIST' from
    * 'CURRENT_DATE'. Parallelizes listing by leveraging 
HoodieSparkEngineContext's methods.
    */
-  public List<String> pruneDatePartitionPaths(HoodieSparkEngineContext context,
-                                              HoodieStorage storage,
-                                              String rootPath, LocalDate 
currentDate) {
+  public List<String> pruneDatePartitionPaths(HoodieSparkEngineContext 
context, FileSystem fs, String rootPath, LocalDate currentDate) {
     List<String> partitionPaths = new ArrayList<>();
     // get all partition paths before date partition level
     partitionPaths.add(rootPath);
     if (datePartitionDepth <= 0) {
       return partitionPaths;
     }
-    StorageConfiguration<Configuration> storageConf = 
HadoopFSUtils.getStorageConfWithCopy(
-        ((FileSystem) storage.getFileSystem()).getConf());
+    HadoopStorageConfiguration storageConf = new 
HadoopStorageConfiguration(fs.getConf());
     for (int i = 0; i < datePartitionDepth; i++) {
       partitionPaths = context.flatMap(partitionPaths, path -> {
         Path subDir = new Path(path);
         FileSystem fileSystem = subDir.getFileSystem(storageConf.unwrap());
         // skip files/dirs whose names start with (_, ., etc)
         FileStatus[] statuses = fileSystem.listStatus(subDir,
-            file -> IGNORE_FILEPREFIX_LIST.stream()
-                .noneMatch(pfx -> file.getName().startsWith(pfx)));
+            file -> IGNORE_FILEPREFIX_LIST.stream().noneMatch(pfx -> 
file.getName().startsWith(pfx)));
         List<String> res = new ArrayList<>();
         for (FileStatus status : statuses) {
           res.add(status.getPath().toString());
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieMultiTableStreamer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieMultiTableStreamer.java
index f1116150be3..a637f7fbbff 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieMultiTableStreamer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieMultiTableStreamer.java
@@ -29,7 +29,6 @@ import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.hive.HiveSyncTool;
-import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.sync.common.HoodieSyncConfig;
 import org.apache.hudi.utilities.IdentitySplitter;
 import org.apache.hudi.utilities.UtilHelpers;
@@ -90,7 +89,7 @@ public class HoodieMultiTableStreamer {
     FileSystem fs = HadoopFSUtils.getFs(commonPropsFile, 
jssc.hadoopConfiguration());
     configFolder = configFolder.charAt(configFolder.length() - 1) == '/' ? 
configFolder.substring(0, configFolder.length() - 1) : configFolder;
     checkIfPropsFileAndConfigFolderExist(commonPropsFile, configFolder, fs);
-    TypedProperties commonProperties = UtilHelpers.readConfig(fs.getConf(), 
new StoragePath(commonPropsFile), new ArrayList<String>()).getProps();
+    TypedProperties commonProperties = UtilHelpers.readConfig(fs.getConf(), 
new Path(commonPropsFile), new ArrayList<String>()).getProps();
     //get the tables to be ingested and their corresponding config files from 
this properties instance
     populateTableExecutionContextList(commonProperties, configFolder, fs, 
config);
   }
@@ -131,7 +130,7 @@ public class HoodieMultiTableStreamer {
       String configFilePath = getStringWithAltKeys(properties, configProp, 
oldConfigProp,
           Helpers.getDefaultConfigFilePath(configFolder, database, 
currentTable));
       checkIfTableConfigFileExists(configFolder, fs, configFilePath);
-      TypedProperties tableProperties = UtilHelpers.readConfig(fs.getConf(), 
new StoragePath(configFilePath), new ArrayList<>()).getProps();
+      TypedProperties tableProperties = UtilHelpers.readConfig(fs.getConf(), 
new Path(configFilePath), new ArrayList<>()).getProps();
       properties.forEach((k, v) -> {
         if (tableProperties.get(k) == null) {
           tableProperties.setProperty(k.toString(), v.toString());
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
index 4ea84ff7a5e..4fe25870201 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
@@ -74,6 +74,7 @@ import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SparkSession;
@@ -178,7 +179,7 @@ public class HoodieStreamer implements Serializable {
     } else if (cfg.propsFilePath.equals(Config.DEFAULT_DFS_SOURCE_PROPERTIES)) 
{
       hoodieConfig.setAll(UtilHelpers.getConfig(cfg.configs).getProps());
     } else {
-      hoodieConfig.setAll(readConfig(hadoopConf, new 
StoragePath(cfg.propsFilePath), cfg.configs).getProps());
+      hoodieConfig.setAll(readConfig(hadoopConf, new Path(cfg.propsFilePath), 
cfg.configs).getProps());
     }
 
     // set any configs that Deltastreamer has to override explicitly
@@ -447,7 +448,7 @@ public class HoodieStreamer implements Serializable {
     public static TypedProperties getProps(Configuration conf, Config cfg) {
       return cfg.propsFilePath.isEmpty()
           ? buildProperties(cfg.configs)
-          : readConfig(conf, new StoragePath(cfg.propsFilePath), 
cfg.configs).getProps();
+          : readConfig(conf, new Path(cfg.propsFilePath), 
cfg.configs).getProps();
     }
 
     @Override
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDFSPathSelectorCommonMethods.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDFSPathSelectorCommonMethods.java
index a31938c439b..684261cab84 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDFSPathSelectorCommonMethods.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDFSPathSelectorCommonMethods.java
@@ -23,10 +23,11 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.storage.StoragePath;
-import org.apache.hudi.storage.StoragePathInfo;
 import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
 
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -46,7 +47,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public class TestDFSPathSelectorCommonMethods extends 
HoodieSparkClientTestHarness {
 
   TypedProperties props;
-  StoragePath inputPath;
+  Path inputPath;
 
   @BeforeEach
   void setUp() {
@@ -56,7 +57,7 @@ public class TestDFSPathSelectorCommonMethods extends 
HoodieSparkClientTestHarne
     props = new TypedProperties();
     props.setProperty(ROOT_INPUT_PATH.key(), basePath);
     props.setProperty(PARTITIONS_LIST_PARALLELISM.key(), "1");
-    inputPath = new StoragePath(basePath);
+    inputPath = new Path(basePath);
   }
 
   @AfterEach
@@ -72,7 +73,8 @@ public class TestDFSPathSelectorCommonMethods extends 
HoodieSparkClientTestHarne
     createBaseFile(basePath, "p1", "000", ".foo2", 1);
     createBaseFile(basePath, "p1", "000", "_foo3", 1);
 
-    List<StoragePathInfo> eligibleFiles = selector.listEligibleFiles(storage, 
inputPath, 0);
+    List<FileStatus> eligibleFiles = selector.listEligibleFiles(
+        (FileSystem) storage.getFileSystem(), inputPath, 0);
     assertEquals(1, eligibleFiles.size());
     assertTrue(eligibleFiles.get(0).getPath().getName().startsWith("foo1"));
   }
@@ -85,7 +87,8 @@ public class TestDFSPathSelectorCommonMethods extends 
HoodieSparkClientTestHarne
     createBaseFile(basePath, "p1", "000", "foo2", 0);
     createBaseFile(basePath, "p1", "000", "foo3", 0);
 
-    List<StoragePathInfo> eligibleFiles = selector.listEligibleFiles(storage, 
inputPath, 0);
+    List<FileStatus> eligibleFiles = selector.listEligibleFiles(
+        (FileSystem) storage.getFileSystem(), inputPath, 0);
     assertEquals(1, eligibleFiles.size());
     assertTrue(eligibleFiles.get(0).getPath().getName().startsWith("foo1"));
   }
@@ -98,8 +101,8 @@ public class TestDFSPathSelectorCommonMethods extends 
HoodieSparkClientTestHarne
     createBaseFile(basePath, "p1", "000", "foo2", 1);
     createBaseFile(basePath, "p1", "000", "foo3", 1);
 
-    List<StoragePathInfo> eligibleFiles =
-        selector.listEligibleFiles(storage, inputPath, Long.MAX_VALUE);
+    List<FileStatus> eligibleFiles = selector.listEligibleFiles(
+        (FileSystem) storage.getFileSystem(), inputPath, Long.MAX_VALUE);
     assertEquals(0, eligibleFiles.size());
   }
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java
index 439f01600be..509463c58aa 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java
@@ -24,6 +24,7 @@ import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
 import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
 
+import org.apache.hadoop.fs.FileSystem;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
@@ -215,8 +216,8 @@ public class TestDatePartitionPathSelector extends 
HoodieSparkClientTestHarness
     createParentDirsBeforeDatePartitions(root, generateRandomStrings(), 
totalDepthBeforeDatePartitions, leafDirs);
     createDatePartitionsWithFiles(leafDirs, isHiveStylePartition, dateFormat);
 
-    List<String> paths = pathSelector.pruneDatePartitionPaths(context, 
storage, root.toString(),
-        LocalDate.parse(currentDate));
+    List<String> paths = pathSelector.pruneDatePartitionPaths(
+        context, (FileSystem) storage.getFileSystem(), root.toString(), 
LocalDate.parse(currentDate));
     assertEquals(expectedNumFiles, paths.size());
   }
 }


Reply via email to