This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new eba95af7ee6 [HUDI-7790] Revert changes in DFSPathSelector and
UtilHelpers.readConfig (#11293)
eba95af7ee6 is described below
commit eba95af7ee6e5b8228a5e59e63d92bc32a618e7c
Author: Y Ethan Guo <[email protected]>
AuthorDate: Fri May 24 21:15:22 2024 -0700
[HUDI-7790] Revert changes in DFSPathSelector and UtilHelpers.readConfig
(#11293)
---
.../org/apache/hudi/cli/commands/SparkMain.java | 3 +-
.../hudi/integ/testsuite/HoodieTestSuiteJob.java | 3 +-
.../SparkDataSourceContinuousIngestTool.java | 3 +-
.../helpers/DFSTestSuitePathSelector.java | 41 ++++++++--------
.../apache/hudi/utilities/HDFSParquetImporter.java | 3 +-
.../hudi/utilities/HoodieDataTableValidator.java | 3 +-
.../hudi/utilities/HoodieDropPartitionsTool.java | 4 +-
.../org/apache/hudi/utilities/HoodieIndexer.java | 4 +-
.../utilities/HoodieMetadataTableValidator.java | 2 +-
.../apache/hudi/utilities/HoodieRepairTool.java | 2 +-
.../org/apache/hudi/utilities/TableSizeStats.java | 3 +-
.../org/apache/hudi/utilities/UtilHelpers.java | 10 ++--
.../multitable/HoodieMultiTableServicesMain.java | 3 +-
.../utilities/sources/helpers/DFSPathSelector.java | 54 ++++++++++------------
.../sources/helpers/DatePartitionPathSelector.java | 46 +++++++-----------
.../streamer/HoodieMultiTableStreamer.java | 5 +-
.../hudi/utilities/streamer/HoodieStreamer.java | 5 +-
.../helpers/TestDFSPathSelectorCommonMethods.java | 19 ++++----
.../helpers/TestDatePartitionPathSelector.java | 5 +-
19 files changed, 102 insertions(+), 116 deletions(-)
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
index 0a43233db8a..bb63a051d9b 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
@@ -44,7 +44,6 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.apache.hudi.storage.HoodieStorageUtils;
-import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieSparkTable;
import
org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy;
import org.apache.hudi.table.marker.WriteMarkersFactory;
@@ -483,7 +482,7 @@ public class SparkMain {
String payloadClassName, String
enableHiveSync, String propsFilePath, List<String> configs) throws IOException {
TypedProperties properties = propsFilePath == null ?
buildProperties(configs)
- : readConfig(jsc.hadoopConfiguration(), new
StoragePath(propsFilePath), configs).getProps(true);
+ : readConfig(jsc.hadoopConfiguration(), new Path(propsFilePath),
configs).getProps(true);
properties.setProperty(HoodieBootstrapConfig.BASE_PATH.key(), sourcePath);
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
index 8813129d748..70910357d7d 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
@@ -44,7 +44,6 @@ import org.apache.hudi.integ.testsuite.reader.DeltaInputType;
import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode;
import org.apache.hudi.keygen.BuiltinKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
-import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
@@ -115,7 +114,7 @@ public class HoodieTestSuiteJob {
SparkSession.builder().config(jsc.getConf()).enableHiveSupport().getOrCreate();
this.fs = HadoopFSUtils.getFs(cfg.inputBasePath,
jsc.hadoopConfiguration());
this.props =
- UtilHelpers.readConfig(fs.getConf(), new
StoragePath(cfg.propsFilePath), cfg.configs).getProps();
+ UtilHelpers.readConfig(fs.getConf(), new Path(cfg.propsFilePath),
cfg.configs).getProps();
log.info("Creating workload generator with configs : {}",
props.toString());
this.hiveConf = getDefaultHiveConf(jsc.hadoopConfiguration());
this.keyGenerator =
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngestTool.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngestTool.java
index cbb2a27e54f..81bc4435623 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngestTool.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngestTool.java
@@ -22,7 +22,6 @@ package org.apache.hudi.integ.testsuite;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
-import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.utilities.HoodieRepairTool;
import org.apache.hudi.utilities.IdentitySplitter;
import org.apache.hudi.utilities.UtilHelpers;
@@ -133,7 +132,7 @@ public class SparkDataSourceContinuousIngestTool {
* @return the {@link TypedProperties} instance.
*/
private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc,
Config cfg) {
- return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new
StoragePath(cfg.propsFilePath), cfg.configs)
+ return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new
Path(cfg.propsFilePath), cfg.configs)
.getProps(true);
}
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java
index e2a2c19f666..70026aa5f7f 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java
@@ -24,17 +24,20 @@ import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob;
-import org.apache.hudi.storage.StoragePathInfo;
-import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.utilities.config.DFSPathSelectorConfig;
import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
@@ -67,31 +70,31 @@ public class DFSTestSuitePathSelector extends
DFSPathSelector {
}
// obtain all eligible files for the batch
- List<StoragePathInfo> eligibleFiles = new ArrayList<>();
- List<StoragePathInfo> pathInfoList = storage.globEntries(
- new StoragePath(getStringWithAltKeys(props,
DFSPathSelectorConfig.ROOT_INPUT_PATH),
- "*"));
+ List<FileStatus> eligibleFiles = new ArrayList<>();
+ FileStatus[] fileStatuses = fs.globStatus(
+ new Path(getStringWithAltKeys(props,
DFSPathSelectorConfig.ROOT_INPUT_PATH), "*"));
// Say input data is as follow input/1, input/2, input/5 since 3,4 was
rolled back and 5 is new generated data
// checkpoint from the latest commit metadata will be 2 since 3,4 has
been rolled back. We need to set the
// next batch id correctly as 5 instead of 3
- Option<String> correctBatchIdDueToRollback =
Option.fromJavaOptional(pathInfoList.stream()
- .map(f -> f.getPath().toString().split("/")[
- f.getPath().toString().split("/").length - 1])
+ Option<String> correctBatchIdDueToRollback =
Option.fromJavaOptional(Arrays.stream(fileStatuses)
+ .map(f ->
f.getPath().toString().split("/")[f.getPath().toString().split("/").length - 1])
.filter(bid1 -> Integer.parseInt(bid1) > lastBatchId)
.min((bid1, bid2) -> Integer.min(Integer.parseInt(bid1),
Integer.parseInt(bid2))));
- if (correctBatchIdDueToRollback.isPresent()
- && Integer.parseInt(correctBatchIdDueToRollback.get()) >
nextBatchId) {
+ if (correctBatchIdDueToRollback.isPresent() &&
Integer.parseInt(correctBatchIdDueToRollback.get()) > nextBatchId) {
nextBatchId = Integer.parseInt(correctBatchIdDueToRollback.get());
}
- log.info("Using DFSTestSuitePathSelector, checkpoint: " +
lastCheckpointStr + " sourceLimit: "
- + sourceLimit + " lastBatchId: " + lastBatchId + " nextBatchId: " +
nextBatchId);
- for (StoragePathInfo pathInfo : pathInfoList) {
- if (!pathInfo.isDirectory() || IGNORE_FILEPREFIX_LIST.stream()
- .anyMatch(pfx -> pathInfo.getPath().getName().startsWith(pfx))) {
+ log.info("Using DFSTestSuitePathSelector, checkpoint: " +
lastCheckpointStr + " sourceLimit: " + sourceLimit
+ + " lastBatchId: " + lastBatchId + " nextBatchId: " + nextBatchId);
+ for (FileStatus fileStatus : fileStatuses) {
+ if (!fileStatus.isDirectory() || IGNORE_FILEPREFIX_LIST.stream()
+ .anyMatch(pfx -> fileStatus.getPath().getName().startsWith(pfx))) {
continue;
- } else if (Integer.parseInt(pathInfo.getPath().getName()) > lastBatchId
- && Integer.parseInt(pathInfo.getPath().getName()) <= nextBatchId) {
- eligibleFiles.addAll(storage.listFiles(pathInfo.getPath()));
+ } else if (Integer.parseInt(fileStatus.getPath().getName()) >
lastBatchId && Integer.parseInt(fileStatus.getPath()
+ .getName()) <= nextBatchId) {
+ RemoteIterator<LocatedFileStatus> files =
fs.listFiles(fileStatus.getPath(), true);
+ while (files.hasNext()) {
+ eligibleFiles.add(files.next());
+ }
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
index 3513f7c6760..1dc24fd31b8 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
@@ -32,7 +32,6 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
-import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.utilities.streamer.HoodieStreamer;
import com.beust.jcommander.IValueValidator;
@@ -114,7 +113,7 @@ public class HDFSParquetImporter implements Serializable {
public int dataImport(JavaSparkContext jsc, int retry) {
this.fs = HadoopFSUtils.getFs(cfg.targetPath, jsc.hadoopConfiguration());
this.props = cfg.propsFilePath == null ?
UtilHelpers.buildProperties(cfg.configs)
- : UtilHelpers.readConfig(fs.getConf(), new
StoragePath(cfg.propsFilePath), cfg.configs).getProps(true);
+ : UtilHelpers.readConfig(fs.getConf(), new Path(cfg.propsFilePath),
cfg.configs).getProps(true);
LOG.info("Starting data import with configs : " + props.toString());
int ret = -1;
try {
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDataTableValidator.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDataTableValidator.java
index a2c578b1143..6f4c3c87eb0 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDataTableValidator.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDataTableValidator.java
@@ -39,6 +39,7 @@ import org.apache.hudi.table.repair.RepairUtils;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
+import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
@@ -139,7 +140,7 @@ public class HoodieDataTableValidator implements
Serializable {
* @return the {@link TypedProperties} instance.
*/
private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc,
Config cfg) {
- return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new
StoragePath(cfg.propsFilePath), cfg.configs)
+ return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new
Path(cfg.propsFilePath), cfg.configs)
.getProps(true);
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java
index 6a843942dad..8de01545354 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java
@@ -37,7 +37,6 @@ import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncConfigHolder;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
-import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.sync.common.HoodieSyncConfig;
import org.apache.hudi.table.HoodieSparkTable;
@@ -45,6 +44,7 @@ import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
@@ -139,7 +139,7 @@ public class HoodieDropPartitionsTool implements
Serializable {
* @return the {@link TypedProperties} instance.
*/
private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc,
Config cfg) {
- return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new
StoragePath(cfg.propsFilePath), cfg.configs)
+ return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new
Path(cfg.propsFilePath), cfg.configs)
.getProps(true);
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java
index 5469ff583d1..03b6d934b5f 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java
@@ -31,10 +31,10 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.metadata.MetadataPartitionType;
-import org.apache.hudi.storage.StoragePath;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
+import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaSparkContext;
import org.jetbrains.annotations.TestOnly;
import org.slf4j.Logger;
@@ -106,7 +106,7 @@ public class HoodieIndexer {
}
private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc,
HoodieIndexer.Config cfg) {
- return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new
StoragePath(cfg.propsFilePath), cfg.configs)
+ return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new
Path(cfg.propsFilePath), cfg.configs)
.getProps(true);
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 8cc804cacac..ca7b92af836 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -258,7 +258,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
* @return the {@link TypedProperties} instance.
*/
private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc,
Config cfg) {
- return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new
StoragePath(cfg.propsFilePath), cfg.configs)
+ return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new
Path(cfg.propsFilePath), cfg.configs)
.getProps(true);
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java
index d5e6475c301..ee6b8eec23c 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java
@@ -518,7 +518,7 @@ public class HoodieRepairTool {
* @return the {@link TypedProperties} instance.
*/
private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc,
Config cfg) {
- return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new
StoragePath(cfg.propsFilePath), cfg.configs)
+ return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new
Path(cfg.propsFilePath), cfg.configs)
.getProps(true);
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/TableSizeStats.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/TableSizeStats.java
index ff655dfd017..a9b0f70bca9 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/TableSizeStats.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/TableSizeStats.java
@@ -34,7 +34,6 @@ import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.storage.StorageConfiguration;
-import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
import com.beust.jcommander.JCommander;
@@ -132,7 +131,7 @@ public class TableSizeStats implements Serializable {
* @return the {@link TypedProperties} instance.
*/
private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc,
Config cfg) {
- return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new
StoragePath(cfg.propsFilePath), cfg.configs)
+ return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new
Path(cfg.propsFilePath), cfg.configs)
.getProps(true);
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index d0acffe5d17..8e4b5a6ce71 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -118,6 +118,7 @@ import java.util.function.Supplier;
import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
/**
* Bunch of helper methods.
@@ -242,13 +243,14 @@ public class UtilHelpers {
}
public static DFSPropertiesConfiguration readConfig(Configuration
hadoopConfig,
- StoragePath cfgPath,
+ Path cfgPath,
List<String>
overriddenProps) {
- DFSPropertiesConfiguration conf = new
DFSPropertiesConfiguration(hadoopConfig, cfgPath);
+ StoragePath storagePath = convertToStoragePath(cfgPath);
+ DFSPropertiesConfiguration conf = new
DFSPropertiesConfiguration(hadoopConfig, storagePath);
try {
if (!overriddenProps.isEmpty()) {
LOG.info("Adding overridden properties to file properties.");
- conf.addPropsFromStream(new BufferedReader(new
StringReader(String.join("\n", overriddenProps))), cfgPath);
+ conf.addPropsFromStream(new BufferedReader(new
StringReader(String.join("\n", overriddenProps))), storagePath);
}
} catch (IOException ioe) {
throw new HoodieIOException("Unexpected error adding config overrides",
ioe);
@@ -274,7 +276,7 @@ public class UtilHelpers {
public static TypedProperties buildProperties(Configuration hadoopConf,
String propsFilePath, List<String> props) {
return StringUtils.isNullOrEmpty(propsFilePath)
? UtilHelpers.buildProperties(props)
- : UtilHelpers.readConfig(hadoopConf, new StoragePath(propsFilePath),
props)
+ : UtilHelpers.readConfig(hadoopConf, new Path(propsFilePath), props)
.getProps(true);
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/HoodieMultiTableServicesMain.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/HoodieMultiTableServicesMain.java
index 33c412ced4a..10258876fbe 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/HoodieMultiTableServicesMain.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/HoodieMultiTableServicesMain.java
@@ -21,7 +21,6 @@ package org.apache.hudi.utilities.multitable;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.storage.StoragePath;
import
org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy;
import org.apache.hudi.utilities.HoodieCompactor;
import org.apache.hudi.utilities.IdentitySplitter;
@@ -128,7 +127,7 @@ public class HoodieMultiTableServicesMain {
}
private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc,
Config cfg) {
- return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new
StoragePath(cfg.propsFilePath), cfg.configs).getProps(true);
+ return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new
Path(cfg.propsFilePath), cfg.configs).getProps(true);
}
private boolean pathExists(String path) {
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
index 62f182df359..257c015c53b 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
@@ -26,13 +26,12 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
-import org.apache.hudi.storage.HoodieStorage;
-import org.apache.hudi.storage.HoodieStorageUtils;
-import org.apache.hudi.storage.StoragePath;
-import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.utilities.config.DFSPathSelectorConfig;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,15 +65,15 @@ public class DFSPathSelector implements Serializable {
protected static final List<String> IGNORE_FILEPREFIX_LIST =
Arrays.asList(".", "_");
- protected final transient HoodieStorage storage;
+ protected final transient FileSystem fs;
protected final TypedProperties props;
public DFSPathSelector(TypedProperties props, Configuration hadoopConf) {
checkRequiredConfigProperties(
props,
Collections.singletonList(DFSPathSelectorConfig.ROOT_INPUT_PATH));
this.props = props;
- this.storage = HoodieStorageUtils.getStorage(
- getStringWithAltKeys(props, DFSPathSelectorConfig.ROOT_INPUT_PATH),
HadoopFSUtils.getStorageConf(hadoopConf));
+ this.fs = HadoopFSUtils.getFs(
+ getStringWithAltKeys(props, DFSPathSelectorConfig.ROOT_INPUT_PATH),
hadoopConf);
}
/**
@@ -125,19 +124,16 @@ public class DFSPathSelector implements Serializable {
log.info("Root path => " + getStringWithAltKeys(props,
DFSPathSelectorConfig.ROOT_INPUT_PATH)
+ " source limit => " + sourceLimit);
long lastCheckpointTime =
lastCheckpointStr.map(Long::parseLong).orElse(Long.MIN_VALUE);
- List<StoragePathInfo> eligibleFiles = listEligibleFiles(
- storage, new StoragePath(getStringWithAltKeys(props,
- DFSPathSelectorConfig.ROOT_INPUT_PATH)),
- lastCheckpointTime);
+ List<FileStatus> eligibleFiles = listEligibleFiles(
+ fs, new Path(getStringWithAltKeys(props,
DFSPathSelectorConfig.ROOT_INPUT_PATH)), lastCheckpointTime);
// sort them by modification time.
-
eligibleFiles.sort(Comparator.comparingLong(StoragePathInfo::getModificationTime));
+
eligibleFiles.sort(Comparator.comparingLong(FileStatus::getModificationTime));
// Filter based on checkpoint & input size, if needed
long currentBytes = 0;
long newCheckpointTime = lastCheckpointTime;
- List<StoragePathInfo> filteredFiles = new ArrayList<>();
- for (StoragePathInfo f : eligibleFiles) {
- if (currentBytes + f.getLength() >= sourceLimit
- && f.getModificationTime() > newCheckpointTime) {
+ List<FileStatus> filteredFiles = new ArrayList<>();
+ for (FileStatus f : eligibleFiles) {
+ if (currentBytes + f.getLen() >= sourceLimit &&
f.getModificationTime() > newCheckpointTime) {
// we have enough data, we are done
// Also, we've read up to a file with a newer modification time
// so that some files with the same modification time won't be
skipped in next read
@@ -145,7 +141,7 @@ public class DFSPathSelector implements Serializable {
}
newCheckpointTime = f.getModificationTime();
- currentBytes += f.getLength();
+ currentBytes += f.getLen();
filteredFiles.add(f);
}
@@ -155,9 +151,7 @@ public class DFSPathSelector implements Serializable {
}
// read the files out.
- String pathStr =
- filteredFiles.stream().map(f -> f.getPath().toString())
- .collect(Collectors.joining(","));
+ String pathStr = filteredFiles.stream().map(f ->
f.getPath().toString()).collect(Collectors.joining(","));
return new ImmutablePair<>(Option.ofNullable(pathStr),
String.valueOf(newCheckpointTime));
} catch (IOException ioe) {
@@ -168,17 +162,19 @@ public class DFSPathSelector implements Serializable {
/**
* List files recursively, filter out illegible files/directories while
doing so.
*/
- protected List<StoragePathInfo> listEligibleFiles(HoodieStorage storage,
StoragePath path,
- long lastCheckpointTime)
throws IOException {
+ protected List<FileStatus> listEligibleFiles(FileSystem fs, Path path, long
lastCheckpointTime) throws IOException {
// skip files/dirs whose names start with (_, ., etc)
- List<StoragePathInfo> pathInfoList = storage.listDirectEntries(path, file
->
+ FileStatus[] statuses = fs.listStatus(path, file ->
IGNORE_FILEPREFIX_LIST.stream().noneMatch(pfx ->
file.getName().startsWith(pfx)));
- List<StoragePathInfo> res = new ArrayList<>();
- for (StoragePathInfo pathInfo : pathInfoList) {
- if (pathInfo.isDirectory()) {
- res.addAll(listEligibleFiles(storage, pathInfo.getPath(),
lastCheckpointTime));
- } else if (pathInfo.getModificationTime() > lastCheckpointTime &&
pathInfo.getLength() > 0) {
- res.add(pathInfo);
+ List<FileStatus> res = new ArrayList<>();
+ for (FileStatus status : statuses) {
+ if (status.isDirectory()) {
+ // avoid infinite loop
+ if (!status.isSymlink()) {
+ res.addAll(listEligibleFiles(fs, status.getPath(),
lastCheckpointTime));
+ }
+ } else if (status.getModificationTime() > lastCheckpointTime &&
status.getLen() > 0) {
+ res.add(status);
}
}
return res;
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
index ab9ccbb8ca7..70acd7ca527 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
@@ -24,12 +24,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
-import org.apache.hudi.storage.HoodieStorage;
-import org.apache.hudi.storage.HoodieStorageUtils;
-import org.apache.hudi.storage.StorageConfiguration;
-import org.apache.hudi.storage.StoragePath;
-import org.apache.hudi.storage.StoragePathInfo;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.hudi.utilities.config.DatePartitionPathSelectorConfig;
import org.apache.hadoop.conf.Configuration;
@@ -136,28 +131,25 @@ public class DatePartitionPathSelector extends
DFSPathSelector {
+ currentDate);
long lastCheckpointTime =
lastCheckpointStr.map(Long::parseLong).orElse(Long.MIN_VALUE);
HoodieSparkEngineContext context = new
HoodieSparkEngineContext(sparkContext);
- StorageConfiguration<?> storageConf = storage.getConf();
+ HadoopStorageConfiguration storageConf = new
HadoopStorageConfiguration(fs.getConf());
List<String> prunedPartitionPaths = pruneDatePartitionPaths(
- context, storage, getStringWithAltKeys(props, ROOT_INPUT_PATH),
- currentDate);
+ context, fs, getStringWithAltKeys(props, ROOT_INPUT_PATH),
currentDate);
- List<StoragePathInfo> eligibleFiles = context.flatMap(prunedPartitionPaths,
+ List<FileStatus> eligibleFiles = context.flatMap(prunedPartitionPaths,
path -> {
- HoodieStorage storage = HoodieStorageUtils.getStorage(path,
storageConf);
- return listEligibleFiles(storage, new StoragePath(path),
lastCheckpointTime).stream();
+ FileSystem fs = new Path(path).getFileSystem(storageConf.unwrap());
+ return listEligibleFiles(fs, new Path(path),
lastCheckpointTime).stream();
}, partitionsListParallelism);
// sort them by modification time ascending.
- List<StoragePathInfo> sortedEligibleFiles = eligibleFiles.stream()
- .sorted(Comparator.comparingLong(StoragePathInfo::getModificationTime))
- .collect(Collectors.toList());
+ List<FileStatus> sortedEligibleFiles = eligibleFiles.stream()
+
.sorted(Comparator.comparingLong(FileStatus::getModificationTime)).collect(Collectors.toList());
// Filter based on checkpoint & input size, if needed
long currentBytes = 0;
long newCheckpointTime = lastCheckpointTime;
- List<StoragePathInfo> filteredFiles = new ArrayList<>();
- for (StoragePathInfo f : sortedEligibleFiles) {
- if (currentBytes + f.getLength() >= sourceLimit
- && f.getModificationTime() > newCheckpointTime) {
+ List<FileStatus> filteredFiles = new ArrayList<>();
+ for (FileStatus f : sortedEligibleFiles) {
+ if (currentBytes + f.getLen() >= sourceLimit && f.getModificationTime()
> newCheckpointTime) {
// we have enough data, we are done
// Also, we've read up to a file with a newer modification time
// so that some files with the same modification time won't be skipped
in next read
@@ -165,7 +157,7 @@ public class DatePartitionPathSelector extends
DFSPathSelector {
}
newCheckpointTime = f.getModificationTime();
- currentBytes += f.getLength();
+ currentBytes += f.getLen();
filteredFiles.add(f);
}
@@ -175,9 +167,7 @@ public class DatePartitionPathSelector extends
DFSPathSelector {
}
// read the files out.
- String pathStr =
- filteredFiles.stream().map(f -> f.getPath().toString())
- .collect(Collectors.joining(","));
+ String pathStr = filteredFiles.stream().map(f ->
f.getPath().toString()).collect(Collectors.joining(","));
return new ImmutablePair<>(Option.ofNullable(pathStr),
String.valueOf(newCheckpointTime));
}
@@ -186,25 +176,21 @@ public class DatePartitionPathSelector extends
DFSPathSelector {
* Prunes date level partitions to last few days configured by
'NUM_PREV_DAYS_TO_LIST' from
* 'CURRENT_DATE'. Parallelizes listing by leveraging
HoodieSparkEngineContext's methods.
*/
- public List<String> pruneDatePartitionPaths(HoodieSparkEngineContext context,
- HoodieStorage storage,
- String rootPath, LocalDate
currentDate) {
+ public List<String> pruneDatePartitionPaths(HoodieSparkEngineContext
context, FileSystem fs, String rootPath, LocalDate currentDate) {
List<String> partitionPaths = new ArrayList<>();
// get all partition paths before date partition level
partitionPaths.add(rootPath);
if (datePartitionDepth <= 0) {
return partitionPaths;
}
- StorageConfiguration<Configuration> storageConf =
HadoopFSUtils.getStorageConfWithCopy(
- ((FileSystem) storage.getFileSystem()).getConf());
+ HadoopStorageConfiguration storageConf = new
HadoopStorageConfiguration(fs.getConf());
for (int i = 0; i < datePartitionDepth; i++) {
partitionPaths = context.flatMap(partitionPaths, path -> {
Path subDir = new Path(path);
FileSystem fileSystem = subDir.getFileSystem(storageConf.unwrap());
// skip files/dirs whose names start with (_, ., etc)
FileStatus[] statuses = fileSystem.listStatus(subDir,
- file -> IGNORE_FILEPREFIX_LIST.stream()
- .noneMatch(pfx -> file.getName().startsWith(pfx)));
+ file -> IGNORE_FILEPREFIX_LIST.stream().noneMatch(pfx ->
file.getName().startsWith(pfx)));
List<String> res = new ArrayList<>();
for (FileStatus status : statuses) {
res.add(status.getPath().toString());
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieMultiTableStreamer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieMultiTableStreamer.java
index f1116150be3..a637f7fbbff 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieMultiTableStreamer.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieMultiTableStreamer.java
@@ -29,7 +29,6 @@ import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hive.HiveSyncTool;
-import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.sync.common.HoodieSyncConfig;
import org.apache.hudi.utilities.IdentitySplitter;
import org.apache.hudi.utilities.UtilHelpers;
@@ -90,7 +89,7 @@ public class HoodieMultiTableStreamer {
FileSystem fs = HadoopFSUtils.getFs(commonPropsFile,
jssc.hadoopConfiguration());
configFolder = configFolder.charAt(configFolder.length() - 1) == '/' ?
configFolder.substring(0, configFolder.length() - 1) : configFolder;
checkIfPropsFileAndConfigFolderExist(commonPropsFile, configFolder, fs);
- TypedProperties commonProperties = UtilHelpers.readConfig(fs.getConf(),
new StoragePath(commonPropsFile), new ArrayList<String>()).getProps();
+ TypedProperties commonProperties = UtilHelpers.readConfig(fs.getConf(),
new Path(commonPropsFile), new ArrayList<String>()).getProps();
//get the tables to be ingested and their corresponding config files from
this properties instance
populateTableExecutionContextList(commonProperties, configFolder, fs,
config);
}
@@ -131,7 +130,7 @@ public class HoodieMultiTableStreamer {
String configFilePath = getStringWithAltKeys(properties, configProp,
oldConfigProp,
Helpers.getDefaultConfigFilePath(configFolder, database,
currentTable));
checkIfTableConfigFileExists(configFolder, fs, configFilePath);
- TypedProperties tableProperties = UtilHelpers.readConfig(fs.getConf(),
new StoragePath(configFilePath), new ArrayList<>()).getProps();
+ TypedProperties tableProperties = UtilHelpers.readConfig(fs.getConf(),
new Path(configFilePath), new ArrayList<>()).getProps();
properties.forEach((k, v) -> {
if (tableProperties.get(k) == null) {
tableProperties.setProperty(k.toString(), v.toString());
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
index aa0b89c712f..1905cfe6f31 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
@@ -74,6 +74,7 @@ import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
@@ -178,7 +179,7 @@ public class HoodieStreamer implements Serializable {
} else if (cfg.propsFilePath.equals(Config.DEFAULT_DFS_SOURCE_PROPERTIES))
{
hoodieConfig.setAll(UtilHelpers.getConfig(cfg.configs).getProps());
} else {
- hoodieConfig.setAll(readConfig(hadoopConf, new
StoragePath(cfg.propsFilePath), cfg.configs).getProps());
+ hoodieConfig.setAll(readConfig(hadoopConf, new Path(cfg.propsFilePath),
cfg.configs).getProps());
}
// set any configs that Deltastreamer has to override explicitly
@@ -447,7 +448,7 @@ public class HoodieStreamer implements Serializable {
public static TypedProperties getProps(Configuration conf, Config cfg) {
return cfg.propsFilePath.isEmpty()
? buildProperties(cfg.configs)
- : readConfig(conf, new StoragePath(cfg.propsFilePath),
cfg.configs).getProps();
+ : readConfig(conf, new Path(cfg.propsFilePath),
cfg.configs).getProps();
}
@Override
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDFSPathSelectorCommonMethods.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDFSPathSelectorCommonMethods.java
index a31938c439b..684261cab84 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDFSPathSelectorCommonMethods.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDFSPathSelectorCommonMethods.java
@@ -23,10 +23,11 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.storage.StoragePath;
-import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
@@ -46,7 +47,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestDFSPathSelectorCommonMethods extends
HoodieSparkClientTestHarness {
TypedProperties props;
- StoragePath inputPath;
+ Path inputPath;
@BeforeEach
void setUp() {
@@ -56,7 +57,7 @@ public class TestDFSPathSelectorCommonMethods extends
HoodieSparkClientTestHarne
props = new TypedProperties();
props.setProperty(ROOT_INPUT_PATH.key(), basePath);
props.setProperty(PARTITIONS_LIST_PARALLELISM.key(), "1");
- inputPath = new StoragePath(basePath);
+ inputPath = new Path(basePath);
}
@AfterEach
@@ -72,7 +73,8 @@ public class TestDFSPathSelectorCommonMethods extends
HoodieSparkClientTestHarne
createBaseFile(basePath, "p1", "000", ".foo2", 1);
createBaseFile(basePath, "p1", "000", "_foo3", 1);
- List<StoragePathInfo> eligibleFiles = selector.listEligibleFiles(storage,
inputPath, 0);
+ List<FileStatus> eligibleFiles = selector.listEligibleFiles(
+ (FileSystem) storage.getFileSystem(), inputPath, 0);
assertEquals(1, eligibleFiles.size());
assertTrue(eligibleFiles.get(0).getPath().getName().startsWith("foo1"));
}
@@ -85,7 +87,8 @@ public class TestDFSPathSelectorCommonMethods extends
HoodieSparkClientTestHarne
createBaseFile(basePath, "p1", "000", "foo2", 0);
createBaseFile(basePath, "p1", "000", "foo3", 0);
- List<StoragePathInfo> eligibleFiles = selector.listEligibleFiles(storage,
inputPath, 0);
+ List<FileStatus> eligibleFiles = selector.listEligibleFiles(
+ (FileSystem) storage.getFileSystem(), inputPath, 0);
assertEquals(1, eligibleFiles.size());
assertTrue(eligibleFiles.get(0).getPath().getName().startsWith("foo1"));
}
@@ -98,8 +101,8 @@ public class TestDFSPathSelectorCommonMethods extends
HoodieSparkClientTestHarne
createBaseFile(basePath, "p1", "000", "foo2", 1);
createBaseFile(basePath, "p1", "000", "foo3", 1);
- List<StoragePathInfo> eligibleFiles =
- selector.listEligibleFiles(storage, inputPath, Long.MAX_VALUE);
+ List<FileStatus> eligibleFiles = selector.listEligibleFiles(
+ (FileSystem) storage.getFileSystem(), inputPath, Long.MAX_VALUE);
assertEquals(0, eligibleFiles.size());
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java
index 439f01600be..509463c58aa 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java
@@ -24,6 +24,7 @@ import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+import org.apache.hadoop.fs.FileSystem;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
@@ -215,8 +216,8 @@ public class TestDatePartitionPathSelector extends
HoodieSparkClientTestHarness
createParentDirsBeforeDatePartitions(root, generateRandomStrings(),
totalDepthBeforeDatePartitions, leafDirs);
createDatePartitionsWithFiles(leafDirs, isHiveStylePartition, dateFormat);
- List<String> paths = pathSelector.pruneDatePartitionPaths(context,
storage, root.toString(),
- LocalDate.parse(currentDate));
+ List<String> paths = pathSelector.pruneDatePartitionPaths(
+ context, (FileSystem) storage.getFileSystem(), root.toString(),
LocalDate.parse(currentDate));
assertEquals(expectedNumFiles, paths.size());
}
}