This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-0.15.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 72dd5183ba54e2885792d481f30db35166bef218 Author: Y Ethan Guo <[email protected]> AuthorDate: Sat May 25 00:18:00 2024 -0700 [HUDI-7790] Revert changes in DFSPathSelector and UtilHelpers.readConfig (#11294) --- .../org/apache/hudi/cli/commands/SparkMain.java | 3 +- .../hudi/integ/testsuite/HoodieTestSuiteJob.java | 3 +- .../SparkDataSourceContinuousIngestTool.java | 3 +- .../helpers/DFSTestSuitePathSelector.java | 41 ++++++++-------- .../apache/hudi/utilities/HDFSParquetImporter.java | 3 +- .../org/apache/hudi/utilities/HoodieCleaner.java | 3 +- .../apache/hudi/utilities/HoodieClusteringJob.java | 4 +- .../org/apache/hudi/utilities/HoodieCompactor.java | 4 +- .../hudi/utilities/HoodieDataTableValidator.java | 3 +- .../hudi/utilities/HoodieDropPartitionsTool.java | 4 +- .../org/apache/hudi/utilities/HoodieIndexer.java | 4 +- .../utilities/HoodieMetadataTableValidator.java | 2 +- .../apache/hudi/utilities/HoodieRepairTool.java | 2 +- .../org/apache/hudi/utilities/TableSizeStats.java | 3 +- .../org/apache/hudi/utilities/UtilHelpers.java | 8 ++-- .../utilities/sources/helpers/DFSPathSelector.java | 54 ++++++++++------------ .../sources/helpers/DatePartitionPathSelector.java | 46 +++++++----------- .../streamer/HoodieMultiTableStreamer.java | 5 +- .../hudi/utilities/streamer/HoodieStreamer.java | 5 +- .../helpers/TestDFSPathSelectorCommonMethods.java | 19 ++++---- .../helpers/TestDatePartitionPathSelector.java | 5 +- 21 files changed, 105 insertions(+), 119 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java index f8106ffc55c..fe13813490d 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java @@ -44,7 +44,6 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.keygen.constant.KeyGeneratorType; import org.apache.hudi.storage.HoodieStorageUtils; -import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy; import org.apache.hudi.table.marker.WriteMarkersFactory; @@ -483,7 +482,7 @@ public class SparkMain { String payloadClassName, String enableHiveSync, String propsFilePath, List<String> configs) throws IOException { TypedProperties properties = propsFilePath == null ? buildProperties(configs) - : readConfig(jsc.hadoopConfiguration(), new StoragePath(propsFilePath), configs).getProps(true); + : readConfig(jsc.hadoopConfiguration(), new Path(propsFilePath), configs).getProps(true); properties.setProperty(HoodieBootstrapConfig.BASE_PATH.key(), sourcePath); diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java index 8813129d748..70910357d7d 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java @@ -44,7 +44,6 @@ import org.apache.hudi.integ.testsuite.reader.DeltaInputType; import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode; import org.apache.hudi.keygen.BuiltinKeyGenerator; import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; -import org.apache.hudi.storage.StoragePath; import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer; @@ -115,7 +114,7 @@ public class HoodieTestSuiteJob { SparkSession.builder().config(jsc.getConf()).enableHiveSupport().getOrCreate(); this.fs = HadoopFSUtils.getFs(cfg.inputBasePath, jsc.hadoopConfiguration()); this.props = - UtilHelpers.readConfig(fs.getConf(), new StoragePath(cfg.propsFilePath), cfg.configs).getProps(); + UtilHelpers.readConfig(fs.getConf(), new Path(cfg.propsFilePath), cfg.configs).getProps(); log.info("Creating workload generator with configs : {}", props.toString()); this.hiveConf = getDefaultHiveConf(jsc.hadoopConfiguration()); this.keyGenerator = diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngestTool.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngestTool.java index cbb2a27e54f..81bc4435623 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngestTool.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngestTool.java @@ -22,7 +22,6 @@ package org.apache.hudi.integ.testsuite; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.hadoop.fs.HadoopFSUtils; -import org.apache.hudi.storage.StoragePath; import org.apache.hudi.utilities.HoodieRepairTool; import org.apache.hudi.utilities.IdentitySplitter; import org.apache.hudi.utilities.UtilHelpers; @@ -133,7 +132,7 @@ public class SparkDataSourceContinuousIngestTool { * @return the {@link TypedProperties} instance. */ private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) { - return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new StoragePath(cfg.propsFilePath), cfg.configs) + return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs) .getProps(true); } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java index e2a2c19f666..70026aa5f7f 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java @@ -24,17 +24,20 @@ import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob; -import org.apache.hudi.storage.StoragePathInfo; -import org.apache.hudi.storage.StoragePath; import org.apache.hudi.utilities.config.DFSPathSelectorConfig; import org.apache.hudi.utilities.sources.helpers.DFSPathSelector; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; @@ -67,31 +70,31 @@ public class DFSTestSuitePathSelector extends DFSPathSelector { } // obtain all eligible files for the batch - List<StoragePathInfo> eligibleFiles = new ArrayList<>(); - List<StoragePathInfo> pathInfoList = storage.globEntries( - new StoragePath(getStringWithAltKeys(props, DFSPathSelectorConfig.ROOT_INPUT_PATH), - "*")); + List<FileStatus> eligibleFiles = new ArrayList<>(); + FileStatus[] fileStatuses = fs.globStatus( + new Path(getStringWithAltKeys(props, DFSPathSelectorConfig.ROOT_INPUT_PATH), "*")); // Say input data is as follow input/1, input/2, input/5 since 3,4 was rolled back and 5 is new generated data // checkpoint from the latest commit metadata will be 2 since 3,4 has been rolled back. We need to set the // next batch id correctly as 5 instead of 3 - Option<String> correctBatchIdDueToRollback = Option.fromJavaOptional(pathInfoList.stream() - .map(f -> f.getPath().toString().split("/")[ - f.getPath().toString().split("/").length - 1]) + Option<String> correctBatchIdDueToRollback = Option.fromJavaOptional(Arrays.stream(fileStatuses) + .map(f -> f.getPath().toString().split("/")[f.getPath().toString().split("/").length - 1]) .filter(bid1 -> Integer.parseInt(bid1) > lastBatchId) .min((bid1, bid2) -> Integer.min(Integer.parseInt(bid1), Integer.parseInt(bid2)))); - if (correctBatchIdDueToRollback.isPresent() - && Integer.parseInt(correctBatchIdDueToRollback.get()) > nextBatchId) { + if (correctBatchIdDueToRollback.isPresent() && Integer.parseInt(correctBatchIdDueToRollback.get()) > nextBatchId) { nextBatchId = Integer.parseInt(correctBatchIdDueToRollback.get()); } - log.info("Using DFSTestSuitePathSelector, checkpoint: " + lastCheckpointStr + " sourceLimit: " - + sourceLimit + " lastBatchId: " + lastBatchId + " nextBatchId: " + nextBatchId); - for (StoragePathInfo pathInfo : pathInfoList) { - if (!pathInfo.isDirectory() || IGNORE_FILEPREFIX_LIST.stream() - .anyMatch(pfx -> pathInfo.getPath().getName().startsWith(pfx))) { + log.info("Using DFSTestSuitePathSelector, checkpoint: " + lastCheckpointStr + " sourceLimit: " + sourceLimit + + " lastBatchId: " + lastBatchId + " nextBatchId: " + nextBatchId); + for (FileStatus fileStatus : fileStatuses) { + if (!fileStatus.isDirectory() || IGNORE_FILEPREFIX_LIST.stream() + .anyMatch(pfx -> fileStatus.getPath().getName().startsWith(pfx))) { continue; - } else if (Integer.parseInt(pathInfo.getPath().getName()) > lastBatchId - && Integer.parseInt(pathInfo.getPath().getName()) <= nextBatchId) { - eligibleFiles.addAll(storage.listFiles(pathInfo.getPath())); + } else if (Integer.parseInt(fileStatus.getPath().getName()) > lastBatchId && Integer.parseInt(fileStatus.getPath() + .getName()) <= nextBatchId) { + RemoteIterator<LocatedFileStatus> files = fs.listFiles(fileStatus.getPath(), true); + while (files.hasNext()) { + eligibleFiles.add(files.next()); + } } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java index 3513f7c6760..1dc24fd31b8 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java @@ -32,7 +32,6 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.fs.HadoopFSUtils; -import org.apache.hudi.storage.StoragePath; import org.apache.hudi.utilities.streamer.HoodieStreamer; import com.beust.jcommander.IValueValidator; @@ -114,7 +113,7 @@ public class HDFSParquetImporter implements Serializable { public int dataImport(JavaSparkContext jsc, int retry) { this.fs = HadoopFSUtils.getFs(cfg.targetPath, jsc.hadoopConfiguration()); this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs) - : UtilHelpers.readConfig(fs.getConf(), new StoragePath(cfg.propsFilePath), cfg.configs).getProps(true); + : UtilHelpers.readConfig(fs.getConf(), new Path(cfg.propsFilePath), cfg.configs).getProps(true); LOG.info("Starting data import with configs : " + props.toString()); int ret = -1; try { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java index e1d6a13cb9a..83f535191b9 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java @@ -23,7 +23,6 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.storage.StoragePath; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; @@ -62,7 +61,7 @@ public class HoodieCleaner { * Filesystem used. */ this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs) - : UtilHelpers.readConfig(jssc.hadoopConfiguration(), new StoragePath(cfg.propsFilePath), cfg.configs).getProps(true); + : UtilHelpers.readConfig(jssc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs).getProps(true); LOG.info("Creating Cleaner with configs : " + props.toString()); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java index b96b4610376..90c7d493705 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java @@ -29,11 +29,11 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieCleanConfig; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.HoodieSparkTable; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; +import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.JavaSparkContext; import org.jetbrains.annotations.TestOnly; import org.slf4j.Logger; @@ -73,7 +73,7 @@ public class HoodieClusteringJob { } private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) { - return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new StoragePath(cfg.propsFilePath), cfg.configs) + return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs) .getProps(true); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java index 90c66add046..82acce6a4eb 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java @@ -30,7 +30,6 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieCleanConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hadoop.fs.HadoopFSUtils; -import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy; @@ -38,6 +37,7 @@ import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import org.apache.avro.Schema; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.slf4j.Logger; @@ -76,7 +76,7 @@ public class HoodieCompactor { } private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) { - return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new StoragePath(cfg.propsFilePath), cfg.configs) + return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs) .getProps(true); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDataTableValidator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDataTableValidator.java index 459483e547c..9953b5225a3 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDataTableValidator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDataTableValidator.java @@ -39,6 +39,7 @@ import org.apache.hudi.table.repair.RepairUtils; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; +import org.apache.hadoop.fs.Path; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.slf4j.Logger; @@ -139,7 +140,7 @@ public class HoodieDataTableValidator implements Serializable { * @return the {@link TypedProperties} instance. */ private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) { - return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new StoragePath(cfg.propsFilePath), cfg.configs) + return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs) .getProps(true); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java index 17210d25639..05a5742e841 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java @@ -34,7 +34,6 @@ import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HiveSyncConfigHolder; import org.apache.hudi.hive.HiveSyncTool; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; -import org.apache.hudi.storage.StoragePath; import org.apache.hudi.sync.common.HoodieSyncConfig; import org.apache.hudi.table.HoodieSparkTable; @@ -42,6 +41,7 @@ import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; @@ -136,7 +136,7 @@ public class HoodieDropPartitionsTool implements Serializable { * @return the {@link TypedProperties} instance. */ private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) { - return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new StoragePath(cfg.propsFilePath), cfg.configs) + return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs) .getProps(true); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java index 13d168a24c0..5c626a53ae7 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java @@ -31,10 +31,10 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIndexException; import org.apache.hudi.metadata.MetadataPartitionType; -import org.apache.hudi.storage.StoragePath; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; +import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.JavaSparkContext; import org.jetbrains.annotations.TestOnly; import org.slf4j.Logger; @@ -105,7 +105,7 @@ public class HoodieIndexer { } private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, HoodieIndexer.Config cfg) { - return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new StoragePath(cfg.propsFilePath), cfg.configs) + return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs) .getProps(true); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java index a9bade03137..bfb9e18af1b 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java @@ -258,7 +258,7 @@ public class HoodieMetadataTableValidator implements Serializable { * @return the {@link TypedProperties} instance. */ private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) { - return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new StoragePath(cfg.propsFilePath), cfg.configs) + return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs) .getProps(true); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java index b2bb34ede3b..237e0cb2263 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java @@ -518,7 +518,7 @@ public class HoodieRepairTool { * @return the {@link TypedProperties} instance. */ private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) { - return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new StoragePath(cfg.propsFilePath), cfg.configs) + return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs) .getProps(true); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/TableSizeStats.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/TableSizeStats.java index ff655dfd017..a9b0f70bca9 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/TableSizeStats.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/TableSizeStats.java @@ -34,7 +34,6 @@ import org.apache.hudi.exception.TableNotFoundException; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.storage.StorageConfiguration; -import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.hadoop.HoodieHadoopStorage; import com.beust.jcommander.JCommander; @@ -132,7 +131,7 @@ public class TableSizeStats implements Serializable { * @return the {@link TypedProperties} instance. */ private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) { - return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new StoragePath(cfg.propsFilePath), cfg.configs) + return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs) .getProps(true); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index abf0558e5ff..74cc775718a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -118,6 +118,7 @@ import java.util.function.Supplier; import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys; import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath; /** * Bunch of helper methods. @@ -242,13 +243,14 @@ public class UtilHelpers { } public static DFSPropertiesConfiguration readConfig(Configuration hadoopConfig, - StoragePath cfgPath, + Path cfgPath, List<String> overriddenProps) { - DFSPropertiesConfiguration conf = new DFSPropertiesConfiguration(hadoopConfig, cfgPath); + StoragePath storagePath = convertToStoragePath(cfgPath); + DFSPropertiesConfiguration conf = new DFSPropertiesConfiguration(hadoopConfig, storagePath); try { if (!overriddenProps.isEmpty()) { LOG.info("Adding overridden properties to file properties."); - conf.addPropsFromStream(new BufferedReader(new StringReader(String.join("\n", overriddenProps))), cfgPath); + conf.addPropsFromStream(new BufferedReader(new StringReader(String.join("\n", overriddenProps))), storagePath); } } catch (IOException ioe) { throw new HoodieIOException("Unexpected error adding config overrides", ioe); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java index 62f182df359..257c015c53b 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java @@ -26,13 +26,12 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.fs.HadoopFSUtils; -import org.apache.hudi.storage.HoodieStorage; -import org.apache.hudi.storage.HoodieStorageUtils; -import org.apache.hudi.storage.StoragePath; -import org.apache.hudi.storage.StoragePathInfo; import org.apache.hudi.utilities.config.DFSPathSelectorConfig; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.JavaSparkContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,15 +65,15 @@ public class DFSPathSelector implements Serializable { protected static final List<String> IGNORE_FILEPREFIX_LIST = Arrays.asList(".", "_"); - protected final transient HoodieStorage storage; + protected final transient FileSystem fs; protected final TypedProperties props; public DFSPathSelector(TypedProperties props, Configuration hadoopConf) { checkRequiredConfigProperties( props, Collections.singletonList(DFSPathSelectorConfig.ROOT_INPUT_PATH)); this.props = props; - this.storage = HoodieStorageUtils.getStorage( - getStringWithAltKeys(props, DFSPathSelectorConfig.ROOT_INPUT_PATH), HadoopFSUtils.getStorageConf(hadoopConf)); + this.fs = HadoopFSUtils.getFs( + getStringWithAltKeys(props, DFSPathSelectorConfig.ROOT_INPUT_PATH), hadoopConf); } /** @@ -125,19 +124,16 @@ public class DFSPathSelector implements Serializable { log.info("Root path => " + getStringWithAltKeys(props, DFSPathSelectorConfig.ROOT_INPUT_PATH) + " source limit => " + sourceLimit); long lastCheckpointTime = lastCheckpointStr.map(Long::parseLong).orElse(Long.MIN_VALUE); - List<StoragePathInfo> eligibleFiles = listEligibleFiles( - storage, new StoragePath(getStringWithAltKeys(props, - DFSPathSelectorConfig.ROOT_INPUT_PATH)), - lastCheckpointTime); + List<FileStatus> eligibleFiles = listEligibleFiles( + fs, new Path(getStringWithAltKeys(props, DFSPathSelectorConfig.ROOT_INPUT_PATH)), lastCheckpointTime); // sort them by modification time. - eligibleFiles.sort(Comparator.comparingLong(StoragePathInfo::getModificationTime)); + eligibleFiles.sort(Comparator.comparingLong(FileStatus::getModificationTime)); // Filter based on checkpoint & input size, if needed long currentBytes = 0; long newCheckpointTime = lastCheckpointTime; - List<StoragePathInfo> filteredFiles = new ArrayList<>(); - for (StoragePathInfo f : eligibleFiles) { - if (currentBytes + f.getLength() >= sourceLimit - && f.getModificationTime() > newCheckpointTime) { + List<FileStatus> filteredFiles = new ArrayList<>(); + for (FileStatus f : eligibleFiles) { + if (currentBytes + f.getLen() >= sourceLimit && f.getModificationTime() > newCheckpointTime) { // we have enough data, we are done // Also, we've read up to a file with a newer modification time // so that some files with the same modification time won't be skipped in next read @@ -145,7 +141,7 @@ public class DFSPathSelector implements Serializable { } newCheckpointTime = f.getModificationTime(); - currentBytes += f.getLength(); + currentBytes += f.getLen(); filteredFiles.add(f); } @@ -155,9 +151,7 @@ public class DFSPathSelector implements Serializable { } // read the files out. - String pathStr = - filteredFiles.stream().map(f -> f.getPath().toString()) - .collect(Collectors.joining(",")); + String pathStr = filteredFiles.stream().map(f -> f.getPath().toString()).collect(Collectors.joining(",")); return new ImmutablePair<>(Option.ofNullable(pathStr), String.valueOf(newCheckpointTime)); } catch (IOException ioe) { @@ -168,17 +162,19 @@ public class DFSPathSelector implements Serializable { /** * List files recursively, filter out illegible files/directories while doing so. */ - protected List<StoragePathInfo> listEligibleFiles(HoodieStorage storage, StoragePath path, - long lastCheckpointTime) throws IOException { + protected List<FileStatus> listEligibleFiles(FileSystem fs, Path path, long lastCheckpointTime) throws IOException { // skip files/dirs whose names start with (_, ., etc) - List<StoragePathInfo> pathInfoList = storage.listDirectEntries(path, file -> + FileStatus[] statuses = fs.listStatus(path, file -> IGNORE_FILEPREFIX_LIST.stream().noneMatch(pfx -> file.getName().startsWith(pfx))); - List<StoragePathInfo> res = new ArrayList<>(); - for (StoragePathInfo pathInfo : pathInfoList) { - if (pathInfo.isDirectory()) { - res.addAll(listEligibleFiles(storage, pathInfo.getPath(), lastCheckpointTime)); - } else if (pathInfo.getModificationTime() > lastCheckpointTime && pathInfo.getLength() > 0) { - res.add(pathInfo); + List<FileStatus> res = new ArrayList<>(); + for (FileStatus status : statuses) { + if (status.isDirectory()) { + // avoid infinite loop + if (!status.isSymlink()) { + res.addAll(listEligibleFiles(fs, status.getPath(), lastCheckpointTime)); + } + } else if (status.getModificationTime() > lastCheckpointTime && status.getLen() > 0) { + res.add(status); } } return res; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java index ab9ccbb8ca7..70acd7ca527 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java @@ -24,12 +24,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.hadoop.fs.HadoopFSUtils; -import org.apache.hudi.storage.HoodieStorage; -import org.apache.hudi.storage.HoodieStorageUtils; -import org.apache.hudi.storage.StorageConfiguration; -import org.apache.hudi.storage.StoragePath; -import org.apache.hudi.storage.StoragePathInfo; +import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; import org.apache.hudi.utilities.config.DatePartitionPathSelectorConfig; import org.apache.hadoop.conf.Configuration; @@ -136,28 +131,25 @@ public class DatePartitionPathSelector extends DFSPathSelector { + currentDate); long lastCheckpointTime = lastCheckpointStr.map(Long::parseLong).orElse(Long.MIN_VALUE); HoodieSparkEngineContext context = new HoodieSparkEngineContext(sparkContext); - StorageConfiguration<?> storageConf = storage.getConf(); + HadoopStorageConfiguration storageConf = new HadoopStorageConfiguration(fs.getConf()); List<String> prunedPartitionPaths = pruneDatePartitionPaths( - context, storage, getStringWithAltKeys(props, ROOT_INPUT_PATH), - currentDate); + context, fs, getStringWithAltKeys(props, ROOT_INPUT_PATH), currentDate); - List<StoragePathInfo> eligibleFiles = context.flatMap(prunedPartitionPaths, + List<FileStatus> eligibleFiles = context.flatMap(prunedPartitionPaths, path -> { - HoodieStorage storage = HoodieStorageUtils.getStorage(path, storageConf); - return listEligibleFiles(storage, new StoragePath(path), lastCheckpointTime).stream(); + FileSystem fs = new Path(path).getFileSystem(storageConf.unwrap()); + return listEligibleFiles(fs, new Path(path), lastCheckpointTime).stream(); }, partitionsListParallelism); // sort them by modification time ascending. - List<StoragePathInfo> sortedEligibleFiles = eligibleFiles.stream() - .sorted(Comparator.comparingLong(StoragePathInfo::getModificationTime)) - .collect(Collectors.toList()); + List<FileStatus> sortedEligibleFiles = eligibleFiles.stream() + .sorted(Comparator.comparingLong(FileStatus::getModificationTime)).collect(Collectors.toList()); // Filter based on checkpoint & input size, if needed long currentBytes = 0; long newCheckpointTime = lastCheckpointTime; - List<StoragePathInfo> filteredFiles = new ArrayList<>(); - for (StoragePathInfo f : sortedEligibleFiles) { - if (currentBytes + f.getLength() >= sourceLimit - && f.getModificationTime() > newCheckpointTime) { + List<FileStatus> filteredFiles = new ArrayList<>(); + for (FileStatus f : sortedEligibleFiles) { + if (currentBytes + f.getLen() >= sourceLimit && f.getModificationTime() > newCheckpointTime) { // we have enough data, we are done // Also, we've read up to a file with a newer modification time // so that some files with the same modification time won't be skipped in next read @@ -165,7 +157,7 @@ public class DatePartitionPathSelector extends DFSPathSelector { } newCheckpointTime = f.getModificationTime(); - currentBytes += f.getLength(); + currentBytes += f.getLen(); filteredFiles.add(f); } @@ -175,9 +167,7 @@ public class DatePartitionPathSelector extends DFSPathSelector { } // read the files out. - String pathStr = - filteredFiles.stream().map(f -> f.getPath().toString()) - .collect(Collectors.joining(",")); + String pathStr = filteredFiles.stream().map(f -> f.getPath().toString()).collect(Collectors.joining(",")); return new ImmutablePair<>(Option.ofNullable(pathStr), String.valueOf(newCheckpointTime)); } @@ -186,25 +176,21 @@ public class DatePartitionPathSelector extends DFSPathSelector { * Prunes date level partitions to last few days configured by 'NUM_PREV_DAYS_TO_LIST' from * 'CURRENT_DATE'. Parallelizes listing by leveraging HoodieSparkEngineContext's methods. */ - public List<String> pruneDatePartitionPaths(HoodieSparkEngineContext context, - HoodieStorage storage, - String rootPath, LocalDate currentDate) { + public List<String> pruneDatePartitionPaths(HoodieSparkEngineContext context, FileSystem fs, String rootPath, LocalDate currentDate) { List<String> partitionPaths = new ArrayList<>(); // get all partition paths before date partition level partitionPaths.add(rootPath); if (datePartitionDepth <= 0) { return partitionPaths; } - StorageConfiguration<Configuration> storageConf = HadoopFSUtils.getStorageConfWithCopy( - ((FileSystem) storage.getFileSystem()).getConf()); + HadoopStorageConfiguration storageConf = new HadoopStorageConfiguration(fs.getConf()); for (int i = 0; i < datePartitionDepth; i++) { partitionPaths = context.flatMap(partitionPaths, path -> { Path subDir = new Path(path); FileSystem fileSystem = subDir.getFileSystem(storageConf.unwrap()); // skip files/dirs whose names start with (_, ., etc) FileStatus[] statuses = fileSystem.listStatus(subDir, - file -> IGNORE_FILEPREFIX_LIST.stream() - .noneMatch(pfx -> file.getName().startsWith(pfx))); + file -> IGNORE_FILEPREFIX_LIST.stream().noneMatch(pfx -> file.getName().startsWith(pfx))); List<String> res = new ArrayList<>(); for (FileStatus status : statuses) { res.add(status.getPath().toString()); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieMultiTableStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieMultiTableStreamer.java index f1116150be3..a637f7fbbff 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieMultiTableStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieMultiTableStreamer.java @@ -29,7 +29,6 @@ import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.hive.HiveSyncTool; -import org.apache.hudi.storage.StoragePath; import org.apache.hudi.sync.common.HoodieSyncConfig; import org.apache.hudi.utilities.IdentitySplitter; import org.apache.hudi.utilities.UtilHelpers; @@ -90,7 +89,7 @@ public class HoodieMultiTableStreamer { FileSystem fs = HadoopFSUtils.getFs(commonPropsFile, jssc.hadoopConfiguration()); configFolder = configFolder.charAt(configFolder.length() - 1) == '/' ? configFolder.substring(0, configFolder.length() - 1) : configFolder; checkIfPropsFileAndConfigFolderExist(commonPropsFile, configFolder, fs); - TypedProperties commonProperties = UtilHelpers.readConfig(fs.getConf(), new StoragePath(commonPropsFile), new ArrayList<String>()).getProps(); + TypedProperties commonProperties = UtilHelpers.readConfig(fs.getConf(), new Path(commonPropsFile), new ArrayList<String>()).getProps(); //get the tables to be ingested and their corresponding config files from this properties instance populateTableExecutionContextList(commonProperties, configFolder, fs, config); } @@ -131,7 +130,7 @@ public class HoodieMultiTableStreamer { String configFilePath = getStringWithAltKeys(properties, configProp, oldConfigProp, Helpers.getDefaultConfigFilePath(configFolder, database, currentTable)); checkIfTableConfigFileExists(configFolder, fs, configFilePath); - TypedProperties tableProperties = UtilHelpers.readConfig(fs.getConf(), new StoragePath(configFilePath), new ArrayList<>()).getProps(); + TypedProperties tableProperties = UtilHelpers.readConfig(fs.getConf(), new Path(configFilePath), new ArrayList<>()).getProps(); properties.forEach((k, v) -> { if (tableProperties.get(k) == null) { tableProperties.setProperty(k.toString(), v.toString()); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java index 4ea84ff7a5e..4fe25870201 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java @@ -74,6 +74,7 @@ import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; @@ -178,7 +179,7 @@ public class HoodieStreamer implements Serializable { } else if (cfg.propsFilePath.equals(Config.DEFAULT_DFS_SOURCE_PROPERTIES)) { hoodieConfig.setAll(UtilHelpers.getConfig(cfg.configs).getProps()); } else { - hoodieConfig.setAll(readConfig(hadoopConf, new StoragePath(cfg.propsFilePath), cfg.configs).getProps()); + hoodieConfig.setAll(readConfig(hadoopConf, new Path(cfg.propsFilePath), cfg.configs).getProps()); } // set any configs that Deltastreamer has to override explicitly @@ -447,7 +448,7 @@ public class HoodieStreamer implements Serializable { public static TypedProperties getProps(Configuration conf, Config cfg) { return cfg.propsFilePath.isEmpty() ? buildProperties(cfg.configs) - : readConfig(conf, new StoragePath(cfg.propsFilePath), cfg.configs).getProps(); + : readConfig(conf, new Path(cfg.propsFilePath), cfg.configs).getProps(); } @Override diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDFSPathSelectorCommonMethods.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDFSPathSelectorCommonMethods.java index a31938c439b..684261cab84 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDFSPathSelectorCommonMethods.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDFSPathSelectorCommonMethods.java @@ -23,10 +23,11 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.storage.StoragePath; -import org.apache.hudi.storage.StoragePathInfo; import org.apache.hudi.testutils.HoodieSparkClientTestHarness; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; @@ -46,7 +47,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class TestDFSPathSelectorCommonMethods extends HoodieSparkClientTestHarness { TypedProperties props; - StoragePath inputPath; + Path inputPath; @BeforeEach void setUp() { @@ -56,7 +57,7 @@ public class TestDFSPathSelectorCommonMethods extends HoodieSparkClientTestHarne props = new TypedProperties(); props.setProperty(ROOT_INPUT_PATH.key(), basePath); props.setProperty(PARTITIONS_LIST_PARALLELISM.key(), "1"); - inputPath = new StoragePath(basePath); + inputPath = new Path(basePath); } @AfterEach @@ -72,7 +73,8 @@ public class TestDFSPathSelectorCommonMethods extends HoodieSparkClientTestHarne createBaseFile(basePath, "p1", "000", ".foo2", 1); createBaseFile(basePath, "p1", "000", "_foo3", 1); - List<StoragePathInfo> eligibleFiles = selector.listEligibleFiles(storage, inputPath, 0); + List<FileStatus> eligibleFiles = selector.listEligibleFiles( + (FileSystem) storage.getFileSystem(), inputPath, 0); assertEquals(1, eligibleFiles.size()); assertTrue(eligibleFiles.get(0).getPath().getName().startsWith("foo1")); } @@ -85,7 +87,8 @@ public class TestDFSPathSelectorCommonMethods extends HoodieSparkClientTestHarne createBaseFile(basePath, "p1", "000", "foo2", 0); createBaseFile(basePath, "p1", "000", "foo3", 0); - List<StoragePathInfo> eligibleFiles = selector.listEligibleFiles(storage, inputPath, 0); + List<FileStatus> eligibleFiles = selector.listEligibleFiles( + (FileSystem) storage.getFileSystem(), inputPath, 0); assertEquals(1, eligibleFiles.size()); assertTrue(eligibleFiles.get(0).getPath().getName().startsWith("foo1")); } @@ -98,8 +101,8 @@ public class TestDFSPathSelectorCommonMethods extends HoodieSparkClientTestHarne createBaseFile(basePath, "p1", "000", "foo2", 1); createBaseFile(basePath, "p1", "000", "foo3", 1); - List<StoragePathInfo> eligibleFiles = - selector.listEligibleFiles(storage, inputPath, Long.MAX_VALUE); + List<FileStatus> eligibleFiles = selector.listEligibleFiles( + (FileSystem) storage.getFileSystem(), inputPath, Long.MAX_VALUE); assertEquals(0, eligibleFiles.size()); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java index 439f01600be..509463c58aa 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java @@ -24,6 +24,7 @@ import org.apache.hudi.storage.StoragePath; import org.apache.hudi.testutils.HoodieSparkClientTestHarness; import org.apache.hudi.utilities.testutils.UtilitiesTestBase; +import org.apache.hadoop.fs.FileSystem; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -215,8 +216,8 @@ public class TestDatePartitionPathSelector extends HoodieSparkClientTestHarness createParentDirsBeforeDatePartitions(root, generateRandomStrings(), totalDepthBeforeDatePartitions, leafDirs); createDatePartitionsWithFiles(leafDirs, isHiveStylePartition, dateFormat); - List<String> paths = pathSelector.pruneDatePartitionPaths(context, storage, root.toString(), - LocalDate.parse(currentDate)); + List<String> paths = pathSelector.pruneDatePartitionPaths( + context, (FileSystem) storage.getFileSystem(), root.toString(), LocalDate.parse(currentDate)); assertEquals(expectedNumFiles, paths.size()); } }
