This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9bb8dcdf116 [HUDI-8092] Replace FileSystem and related classes to
dehadoop hudi-client-common (#11805)
9bb8dcdf116 is described below
commit 9bb8dcdf116f4d2b916aebe19adebb673131c43f
Author: Shawn Chang <[email protected]>
AuthorDate: Sun Sep 8 19:58:16 2024 -0700
[HUDI-8092] Replace FileSystem and related classes to dehadoop
hudi-client-common (#11805)
Co-authored-by: Shawn Chang <[email protected]>
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../DirectMarkerTransactionManager.java | 7 +-
.../client/transaction/TransactionManager.java | 3 +-
.../lock/FileSystemBasedLockProvider.java | 43 +++----
.../hudi/client/transaction/lock/LockManager.java | 14 +-
.../index/bucket/ConsistentBucketIndexUtils.java | 59 +++++----
.../org/apache/hudi/io/HoodieAppendHandle.java | 12 +-
.../java/org/apache/hudi/io/HoodieCDCLogger.java | 19 ++-
.../org/apache/hudi/io/HoodieCreateHandle.java | 4 +-
.../java/org/apache/hudi/io/HoodieIOHandle.java | 4 -
.../java/org/apache/hudi/io/HoodieMergeHandle.java | 4 +-
.../hudi/io/HoodieMergeHandleWithChangeLog.java | 4 +-
.../java/org/apache/hudi/table/HoodieTable.java | 5 +-
.../table/action/bootstrap/BootstrapUtils.java | 80 ++++++------
.../table/action/clean/CleanActionExecutor.java | 25 ++--
.../table/action/rollback/BaseRollbackHelper.java | 9 +-
.../rollback/ListingBasedRollbackStrategy.java | 142 +++++++++++----------
.../rollback/MarkerBasedRollbackStrategy.java | 10 +-
.../action/rollback/SerializablePathFilter.java | 26 ----
.../hudi/table/marker/DirectWriteMarkers.java | 28 ++--
...nsactionDirectMarkerBasedDetectionStrategy.java | 4 +-
.../org/apache/hudi/table/repair/RepairUtils.java | 9 --
.../table/upgrade/FourToFiveUpgradeHandler.java | 10 +-
.../hudi/table/upgrade/UpgradeDowngrade.java | 9 +-
.../org/apache/hudi/HoodieTestCommitGenerator.java | 27 ++--
.../FileSystemBasedLockProviderTestClass.java | 27 ++--
.../hudi/client/transaction/TestLockManager.java | 4 +-
.../apache/hudi/table/repair/TestRepairUtils.java | 3 +-
.../hudi/utils/HoodieWriterClientTestHarness.java | 3 +-
.../FlinkMergeAndReplaceHandleWithChangeLog.java | 3 +-
.../hudi/io/FlinkMergeHandleWithChangeLog.java | 3 +-
.../SparkBootstrapCommitActionExecutor.java | 10 +-
.../table/action/bootstrap/TestBootstrapUtils.java | 5 +-
.../table/functional/TestCleanActionExecutor.java | 40 +++---
.../java/org/apache/hudi/common/fs/FSUtils.java | 27 ++++
.../apache/hudi/hadoop/fs/SerializablePath.java | 71 -----------
.../hudi/storage/hadoop/HoodieHadoopStorage.java | 12 ++
.../org/apache/hudi/storage/HoodieStorage.java | 20 +++
.../hudi/io/storage/TestHoodieStorageBase.java | 35 +++++
.../org/apache/hudi/functional/TestBootstrap.java | 8 +-
.../apache/hudi/functional/TestOrcBootstrap.java | 11 +-
40 files changed, 394 insertions(+), 445 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java
index aa99ca63ede..00c27127a50 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java
@@ -27,8 +27,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieNotSupportedException;
-
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.hudi.storage.HoodieStorage;
import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
@@ -41,8 +40,8 @@ import static
org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
public class DirectMarkerTransactionManager extends TransactionManager {
private final String filePath;
- public DirectMarkerTransactionManager(HoodieWriteConfig config, FileSystem
fs, String partitionPath, String fileId) {
- super(new LockManager(config, fs, createUpdatedLockProps(config,
partitionPath, fileId)), config.isLockRequired());
+ public DirectMarkerTransactionManager(HoodieWriteConfig config,
HoodieStorage storage, String partitionPath, String fileId) {
+ super(new LockManager(config, storage, createUpdatedLockProps(config,
partitionPath, fileId)), config.isLockRequired());
this.filePath = partitionPath + "/" + fileId;
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java
index c02ed4a171c..b725243fca0 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java
@@ -24,7 +24,6 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.storage.HoodieStorage;
-import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,7 +42,7 @@ public class TransactionManager implements Serializable {
private Option<HoodieInstant> lastCompletedTxnOwnerInstant = Option.empty();
public TransactionManager(HoodieWriteConfig config, HoodieStorage storage) {
- this(new LockManager(config, (FileSystem) storage.getFileSystem()),
config.isLockRequired());
+ this(new LockManager(config, storage), config.isLockRequired());
}
protected TransactionManager(LockManager lockManager, boolean
isLockRequired) {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java
index 73f14355ca9..8c0bc8842b9 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java
@@ -32,19 +32,18 @@ import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieLockException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StorageSchemes;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
@@ -65,8 +64,8 @@ public class FileSystemBasedLockProvider implements
LockProvider<String>, Serial
private static final Logger LOG =
LoggerFactory.getLogger(FileSystemBasedLockProvider.class);
private static final String LOCK_FILE_NAME = "lock";
private final int lockTimeoutMinutes;
- private final transient FileSystem fs;
- private final transient Path lockFile;
+ private final transient HoodieStorage storage;
+ private final transient StoragePath lockFile;
protected LockConfiguration lockConfiguration;
private SimpleDateFormat sdf;
private LockInfo lockInfo;
@@ -81,13 +80,13 @@ public class FileSystemBasedLockProvider implements
LockProvider<String>, Serial
+ StoragePath.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME;
}
this.lockTimeoutMinutes =
lockConfiguration.getConfig().getInteger(FILESYSTEM_LOCK_EXPIRE_PROP_KEY);
- this.lockFile = new Path(lockDirectory + StoragePath.SEPARATOR +
LOCK_FILE_NAME);
+ this.lockFile = new StoragePath(lockDirectory + StoragePath.SEPARATOR +
LOCK_FILE_NAME);
this.lockInfo = new LockInfo();
this.sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
- this.fs = HadoopFSUtils.getFs(this.lockFile.toString(), configuration);
+ this.storage = HoodieStorageUtils.getStorage(this.lockFile.toString(),
configuration);
List<String> customSupportedFSs =
lockConfiguration.getConfig().getStringList(HoodieCommonConfig.HOODIE_FS_ATOMIC_CREATION_SUPPORT.key(),
",", new ArrayList<>());
- if (!customSupportedFSs.contains(this.fs.getScheme()) &&
!StorageSchemes.isAtomicCreationSupported(this.fs.getScheme())) {
- throw new HoodieLockException("Unsupported scheme :" +
this.fs.getScheme() + ", since this fs can not support atomic creation");
+ if (!customSupportedFSs.contains(this.storage.getScheme()) &&
!StorageSchemes.isAtomicCreationSupported(this.storage.getScheme())) {
+ throw new HoodieLockException("Unsupported scheme :" +
this.storage.getScheme() + ", since this fs can not support atomic creation");
}
}
@@ -95,7 +94,7 @@ public class FileSystemBasedLockProvider implements
LockProvider<String>, Serial
public void close() {
synchronized (LOCK_FILE_NAME) {
try {
- fs.delete(this.lockFile, true);
+ storage.deleteFile(this.lockFile);
} catch (IOException e) {
throw new
HoodieLockException(generateLogStatement(LockState.FAILED_TO_RELEASE), e);
}
@@ -107,9 +106,9 @@ public class FileSystemBasedLockProvider implements
LockProvider<String>, Serial
try {
synchronized (LOCK_FILE_NAME) {
// Check whether lock is already expired, if so try to delete lock file
- if (fs.exists(this.lockFile)) {
+ if (storage.exists(this.lockFile)) {
if (checkIfExpired()) {
- fs.delete(this.lockFile, true);
+ storage.deleteFile(this.lockFile);
LOG.warn("Delete expired lock file: " + this.lockFile);
} else {
reloadCurrentOwnerLockInfo();
@@ -117,7 +116,7 @@ public class FileSystemBasedLockProvider implements
LockProvider<String>, Serial
}
}
acquireLock();
- return fs.exists(this.lockFile);
+ return storage.exists(this.lockFile);
}
} catch (IOException | HoodieIOException e) {
LOG.info(generateLogStatement(LockState.FAILED_TO_ACQUIRE), e);
@@ -129,8 +128,8 @@ public class FileSystemBasedLockProvider implements
LockProvider<String>, Serial
public void unlock() {
synchronized (LOCK_FILE_NAME) {
try {
- if (fs.exists(this.lockFile)) {
- fs.delete(this.lockFile, true);
+ if (storage.exists(this.lockFile)) {
+ storage.deleteFile(this.lockFile);
}
} catch (IOException io) {
throw new
HoodieIOException(generateLogStatement(LockState.FAILED_TO_RELEASE), io);
@@ -153,7 +152,7 @@ public class FileSystemBasedLockProvider implements
LockProvider<String>, Serial
return false;
}
try {
- long modificationTime =
fs.getFileStatus(this.lockFile).getModificationTime();
+ long modificationTime =
storage.getPathInfo(this.lockFile).getModificationTime();
if (System.currentTimeMillis() - modificationTime > lockTimeoutMinutes *
60 * 1000L) {
return true;
}
@@ -164,10 +163,10 @@ public class FileSystemBasedLockProvider implements
LockProvider<String>, Serial
}
private void acquireLock() {
- try (FSDataOutputStream fos = fs.create(this.lockFile, false)) {
- if (!fs.exists(this.lockFile)) {
+ try (OutputStream os = storage.create(this.lockFile, false)) {
+ if (!storage.exists(this.lockFile)) {
initLockInfo();
- fos.writeBytes(lockInfo.toString());
+ os.write(StringUtils.getUTF8Bytes(lockInfo.toString()));
}
} catch (IOException e) {
throw new
HoodieIOException(generateLogStatement(LockState.FAILED_TO_ACQUIRE), e);
@@ -181,8 +180,8 @@ public class FileSystemBasedLockProvider implements
LockProvider<String>, Serial
}
public void reloadCurrentOwnerLockInfo() {
- try (InputStream is = fs.open(this.lockFile)) {
- if (fs.exists(this.lockFile)) {
+ try (InputStream is = storage.open(this.lockFile)) {
+ if (storage.exists(this.lockFile)) {
this.currentOwnerLockInfo = FileIOUtils.readAsUTFString(is);
} else {
this.currentOwnerLockInfo = "";
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java
index 39df3c3fe43..57ed6df45af 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java
@@ -28,11 +28,9 @@ import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieLockException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
-import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
-import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,19 +56,19 @@ public class LockManager implements Serializable,
AutoCloseable {
private transient HoodieLockMetrics metrics;
private volatile LockProvider lockProvider;
- public LockManager(HoodieWriteConfig writeConfig, FileSystem fs) {
- this(writeConfig, fs, writeConfig.getProps());
+ public LockManager(HoodieWriteConfig writeConfig, HoodieStorage storage) {
+ this(writeConfig, storage, writeConfig.getProps());
}
- public LockManager(HoodieWriteConfig writeConfig, FileSystem fs,
TypedProperties lockProps) {
+ public LockManager(HoodieWriteConfig writeConfig, HoodieStorage storage,
TypedProperties lockProps) {
this.writeConfig = writeConfig;
- this.storageConf = HadoopFSUtils.getStorageConfWithCopy(fs.getConf());
+ this.storageConf = storage.getConf().newInstance();
this.lockConfiguration = new LockConfiguration(lockProps);
maxRetries =
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,
Integer.parseInt(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.defaultValue()));
maxWaitTimeInMs =
lockConfiguration.getConfig().getLong(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
Long.parseLong(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.defaultValue()));
- metrics = new HoodieLockMetrics(writeConfig, new HoodieHadoopStorage(fs));
+ metrics = new HoodieLockMetrics(writeConfig, storage);
lockRetryHelper = new RetryHelper<>(maxWaitTimeInMs, maxRetries,
maxWaitTimeInMs,
Arrays.asList(HoodieLockException.class, InterruptedException.class),
"acquire lock");
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java
index 453f08a486a..1cd5d110de5 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java
@@ -29,14 +29,11 @@ import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieIndexException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.table.HoodieTable;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,7 +42,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
@@ -58,7 +54,6 @@ import static
org.apache.hudi.common.model.HoodieConsistentHashingMetadata.HASHI
import static
org.apache.hudi.common.model.HoodieConsistentHashingMetadata.HASHING_METADATA_FILE_SUFFIX;
import static
org.apache.hudi.common.model.HoodieConsistentHashingMetadata.getTimestampFromFile;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
-import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
/**
* Utilities class for consistent bucket index metadata management.
@@ -110,30 +105,31 @@ public class ConsistentBucketIndexUtils {
*/
public static Option<HoodieConsistentHashingMetadata>
loadMetadata(HoodieTable table, String partition) {
HoodieTableMetaClient metaClient = table.getMetaClient();
- Path metadataPath =
HadoopFSUtils.constructAbsolutePathInHadoopPath(metaClient.getHashingMetadataPath(),
partition);
- Path partitionPath =
HadoopFSUtils.constructAbsolutePathInHadoopPath(metaClient.getBasePath().toString(),
partition);
+ StoragePath metadataPath =
FSUtils.constructAbsolutePath(metaClient.getHashingMetadataPath(), partition);
+ StoragePath partitionPath =
FSUtils.constructAbsolutePath(metaClient.getBasePath(), partition);
try {
- Predicate<FileStatus> hashingMetaCommitFilePredicate = fileStatus -> {
- String filename = fileStatus.getPath().getName();
+ Predicate<StoragePathInfo> hashingMetaCommitFilePredicate = pathInfo -> {
+ String filename = pathInfo.getPath().getName();
return
filename.contains(HoodieConsistentHashingMetadata.HASHING_METADATA_COMMIT_FILE_SUFFIX);
};
- Predicate<FileStatus> hashingMetadataFilePredicate = fileStatus -> {
- String filename = fileStatus.getPath().getName();
+ Predicate<StoragePathInfo> hashingMetadataFilePredicate = pathInfo -> {
+ String filename = pathInfo.getPath().getName();
return filename.contains(HASHING_METADATA_FILE_SUFFIX);
};
- final FileStatus[] metaFiles =
- ((FileSystem)
metaClient.getStorage().getFileSystem()).listStatus(metadataPath);
- final TreeSet<String> commitMetaTss =
Arrays.stream(metaFiles).filter(hashingMetaCommitFilePredicate)
+ final List<StoragePathInfo> metaFiles =
metaClient.getStorage().listDirectEntries(metadataPath);
+ final TreeSet<String> commitMetaTss =
metaFiles.stream().filter(hashingMetaCommitFilePredicate)
.map(commitFile ->
HoodieConsistentHashingMetadata.getTimestampFromFile(commitFile.getPath().getName()))
.sorted()
.collect(Collectors.toCollection(TreeSet::new));
- final FileStatus[] hashingMetaFiles =
Arrays.stream(metaFiles).filter(hashingMetadataFilePredicate)
+ final List<StoragePathInfo> hashingMetaFiles =
metaFiles.stream().filter(hashingMetadataFilePredicate)
.sorted(Comparator.comparing(f -> f.getPath().getName()))
- .toArray(FileStatus[]::new);
+ .collect(Collectors.toList());
// max committed metadata file
final String maxCommitMetaFileTs = commitMetaTss.isEmpty() ? null :
commitMetaTss.last();
// max updated metadata file
- FileStatus maxMetadataFile = hashingMetaFiles.length > 0 ?
hashingMetaFiles[hashingMetaFiles.length - 1] : null;
+ StoragePathInfo maxMetadataFile = hashingMetaFiles.isEmpty()
+ ? null
+ : hashingMetaFiles.get(hashingMetaFiles.size() - 1);
// If single file present in metadata and if its default file return it
if (maxMetadataFile != null &&
HoodieConsistentHashingMetadata.getTimestampFromFile(maxMetadataFile.getPath().getName()).equals(HoodieTimeline.INIT_INSTANT_TS))
{
return loadMetadataFromGivenFile(table, maxMetadataFile);
@@ -146,9 +142,9 @@ public class ConsistentBucketIndexUtils {
HoodieTimeline completedCommits =
metaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants();
// fix the in-consistency between un-committed and committed hashing
metadata files.
- List<FileStatus> fixed = new ArrayList<>();
- Arrays.stream(hashingMetaFiles).forEach(hashingMetaFile -> {
- Path path = hashingMetaFile.getPath();
+ List<StoragePathInfo> fixed = new ArrayList<>();
+ hashingMetaFiles.forEach(hashingMetaFile -> {
+ StoragePath path = hashingMetaFile.getPath();
String timestamp =
HoodieConsistentHashingMetadata.getTimestampFromFile(path.getName());
if (maxCommitMetaFileTs != null &&
timestamp.compareTo(maxCommitMetaFileTs) <= 0) {
// only fix the metadata with greater timestamp than max committed
timestamp
@@ -206,14 +202,14 @@ public class ConsistentBucketIndexUtils {
* Creates commit marker corresponding to hashing metadata file after post
commit clustering operation.
*
* @param table Hoodie table
- * @param fileStatus File for which commit marker should be created
+ * @param path File for which commit marker should be created
* @param partitionPath Partition path the file belongs to
* @throws IOException
*/
- private static void createCommitMarker(HoodieTable table, Path fileStatus,
Path partitionPath) throws IOException {
+ private static void createCommitMarker(HoodieTable table, StoragePath path,
StoragePath partitionPath) throws IOException {
HoodieStorage storage = table.getStorage();
- StoragePath fullPath = new StoragePath(convertToStoragePath(partitionPath),
- getTimestampFromFile(fileStatus.getName()) +
HASHING_METADATA_COMMIT_FILE_SUFFIX);
+ StoragePath fullPath = new StoragePath(partitionPath,
+ getTimestampFromFile(path.getName()) +
HASHING_METADATA_COMMIT_FILE_SUFFIX);
if (storage.exists(fullPath)) {
return;
}
@@ -236,11 +232,11 @@ public class ConsistentBucketIndexUtils {
* @param metaFile Hashing metadata file
* @return HoodieConsistentHashingMetadata object
*/
- private static Option<HoodieConsistentHashingMetadata>
loadMetadataFromGivenFile(HoodieTable table, FileStatus metaFile) {
+ private static Option<HoodieConsistentHashingMetadata>
loadMetadataFromGivenFile(HoodieTable table, StoragePathInfo metaFile) {
if (metaFile == null) {
return Option.empty();
}
- try (InputStream is =
table.getStorage().open(convertToStoragePath(metaFile.getPath()))) {
+ try (InputStream is = table.getStorage().open(metaFile.getPath())) {
byte[] content = FileIOUtils.readAsByteArray(is);
return Option.of(HoodieConsistentHashingMetadata.fromBytes(content));
} catch (FileNotFoundException e) {
@@ -267,8 +263,8 @@ public class ConsistentBucketIndexUtils {
* @param partition Partition metadata file belongs to
* @return true if hashing metadata file is latest else false
*/
- private static boolean recommitMetadataFile(HoodieTable table, FileStatus
metaFile, String partition) {
- Path partitionPath = new
Path(FSUtils.constructAbsolutePath(table.getMetaClient().getBasePath(),
partition).toUri());
+ private static boolean recommitMetadataFile(HoodieTable table,
StoragePathInfo metaFile, String partition) {
+ StoragePath partitionPath =
FSUtils.constructAbsolutePath(table.getMetaClient().getBasePath(), partition);
String timestamp = getTimestampFromFile(metaFile.getPath().getName());
if (table.getPendingCommitsTimeline().containsInstant(timestamp)) {
return false;
@@ -279,7 +275,10 @@ public class ConsistentBucketIndexUtils {
}
HoodieConsistentHashingMetadata hoodieConsistentHashingMetadata =
hoodieConsistentHashingMetadataOption.get();
- Predicate<String> hoodieFileGroupIdPredicate = hoodieBaseFile ->
hoodieConsistentHashingMetadata.getNodes().stream().anyMatch(node ->
node.getFileIdPrefix().equals(hoodieBaseFile));
+ Predicate<String> hoodieFileGroupIdPredicate = hoodieBaseFile ->
+ hoodieConsistentHashingMetadata.getNodes()
+ .stream()
+ .anyMatch(node -> node.getFileIdPrefix().equals(hoodieBaseFile));
if (table.getBaseFileOnlyView().getLatestBaseFiles(partition)
.map(fileIdPrefix ->
FSUtils.getFileIdPfxFromFileId(fileIdPrefix.getFileId())).anyMatch(hoodieFileGroupIdPredicate))
{
try {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 30bcc767274..cb0c7dd283f 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -56,12 +56,10 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieAppendException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieUpsertException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
import org.apache.avro.Schema;
-import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -329,9 +327,9 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
}
private String makeFilePath(HoodieLogFile logFile) {
- return partitionPath.length() == 0
- ? new Path(logFile.getFileName()).toString()
- : new Path(partitionPath, logFile.getFileName()).toString();
+ return partitionPath.isEmpty()
+ ? new StoragePath(logFile.getFileName()).toString()
+ : new StoragePath(partitionPath, logFile.getFileName()).toString();
}
private void resetWriteCounts() {
@@ -525,7 +523,9 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
// TODO we can actually deduce file size purely from AppendResult (based
on offset and size
// of the appended block)
for (WriteStatus status : statuses) {
- long logFileSize = HadoopFSUtils.getFileSize(fs, new
Path(config.getBasePath(), status.getStat().getPath()));
+ long logFileSize = storage.getPathInfo(
+ new StoragePath(config.getBasePath(), status.getStat().getPath()))
+ .getLength();
status.getStat().setFileSizeInBytes(logFileSize);
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
index d629b8be0ea..5677f4188bd 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
@@ -39,14 +39,13 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieUpsertException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import java.io.Closeable;
import java.io.IOException;
@@ -72,7 +71,7 @@ public class HoodieCDCLogger implements Closeable {
private final String partitionPath;
- private final FileSystem fs;
+ private final HoodieStorage storage;
private final Schema dataSchema;
@@ -102,14 +101,14 @@ public class HoodieCDCLogger implements Closeable {
private final SizeEstimator<HoodieAvroPayload> sizeEstimator;
- private final List<Path> cdcAbsPaths;
+ private final List<StoragePath> cdcAbsPaths;
public HoodieCDCLogger(
String commitTime,
HoodieWriteConfig config,
HoodieTableConfig tableConfig,
String partitionPath,
- FileSystem fs,
+ HoodieStorage storage,
Schema schema,
HoodieLogFormat.Writer cdcWriter,
long maxInMemorySizeInBytes) {
@@ -119,7 +118,7 @@ public class HoodieCDCLogger implements Closeable {
? HoodieRecord.RECORD_KEY_METADATA_FIELD
: tableConfig.getRecordKeyFieldProp();
this.partitionPath = partitionPath;
- this.fs = fs;
+ this.storage = storage;
this.dataSchema = HoodieAvroUtils.removeMetadataFields(schema);
this.cdcWriter = cdcWriter;
this.cdcSupplementalLoggingMode =
tableConfig.cdcSupplementalLoggingMode();
@@ -196,7 +195,7 @@ public class HoodieCDCLogger implements Closeable {
HoodieLogBlock block = new HoodieCDCDataBlock(records,
cdcDataBlockHeader, keyField);
AppendResult result =
cdcWriter.appendBlocks(Collections.singletonList(block));
- Path cdcAbsPath = new Path(result.logFile().getPath().toUri());
+ StoragePath cdcAbsPath = result.logFile().getPath();
if (!cdcAbsPaths.contains(cdcAbsPath)) {
cdcAbsPaths.add(cdcAbsPath);
}
@@ -213,10 +212,10 @@ public class HoodieCDCLogger implements Closeable {
public Map<String, Long> getCDCWriteStats() {
Map<String, Long> stats = new HashMap<>();
try {
- for (Path cdcAbsPath : cdcAbsPaths) {
+ for (StoragePath cdcAbsPath : cdcAbsPaths) {
String cdcFileName = cdcAbsPath.getName();
String cdcPath = StringUtils.isNullOrEmpty(partitionPath) ?
cdcFileName : partitionPath + "/" + cdcFileName;
- stats.put(cdcPath, HadoopFSUtils.getFileSize(fs, cdcAbsPath));
+ stats.put(cdcPath, storage.getPathInfo(cdcAbsPath).getLength());
}
} catch (IOException e) {
throw new HoodieUpsertException("Failed to get cdc write stat", e);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
index dc61749bf93..12406927ae6 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
@@ -32,14 +32,12 @@ import org.apache.hudi.common.model.MetadataValues;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieInsertException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
import org.apache.avro.Schema;
-import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -245,7 +243,7 @@ public class HoodieCreateHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
stat.setPath(new StoragePath(config.getBasePath()), path);
stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords());
- long fileSize = HadoopFSUtils.getFileSize(fs, new Path(path.toUri()));
+ long fileSize = storage.getPathInfo(path).getLength();
stat.setTotalWriteBytes(fileSize);
stat.setFileSizeInBytes(fileSize);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieIOHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieIOHandle.java
index 6865a6ac653..28c1dfbca56 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieIOHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieIOHandle.java
@@ -24,14 +24,11 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.table.HoodieTable;
-import org.apache.hadoop.fs.FileSystem;
-
public abstract class HoodieIOHandle<T, I, K, O> {
protected final String instantTime;
protected final HoodieWriteConfig config;
protected final HoodieTable<T, I, K, O> hoodieTable;
- protected FileSystem fs;
protected HoodieStorage storage;
HoodieIOHandle(HoodieWriteConfig config, Option<String> instantTime,
HoodieTable<T, I, K, O> hoodieTable) {
@@ -39,7 +36,6 @@ public abstract class HoodieIOHandle<T, I, K, O> {
this.config = config;
this.hoodieTable = hoodieTable;
this.storage = getStorage();
- this.fs = (FileSystem) storage.getFileSystem();
}
public abstract HoodieStorage getStorage();
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index b5dd3374d27..1bf6f6b0138 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -42,7 +42,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCorruptedDataException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieUpsertException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
@@ -52,7 +51,6 @@ import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
import org.apache.avro.Schema;
-import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -439,7 +437,7 @@ public class HoodieMergeHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O>
fileWriter.close();
fileWriter = null;
- long fileSizeInBytes = HadoopFSUtils.getFileSize(fs, new
Path(newFilePath.toUri()));
+ long fileSizeInBytes = storage.getPathInfo(newFilePath).getLength();
HoodieWriteStat stat = writeStatus.getStat();
stat.setTotalWriteBytes(fileSizeInBytes);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
index fba72310513..08be71288f9 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
@@ -60,7 +60,7 @@ public class HoodieMergeHandleWithChangeLog<T, I, K, O>
extends HoodieMergeHandl
config,
hoodieTable.getMetaClient().getTableConfig(),
partitionPath,
- fs,
+ storage,
getWriterSchema(),
createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX),
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
@@ -78,7 +78,7 @@ public class HoodieMergeHandleWithChangeLog<T, I, K, O>
extends HoodieMergeHandl
config,
hoodieTable.getMetaClient().getTableConfig(),
partitionPath,
- fs,
+ storage,
getWriterSchema(),
createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX),
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index a70f5e7edba..89b58a9f116 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -86,7 +86,6 @@ import org.apache.hudi.table.storage.HoodieLayoutFactory;
import org.apache.hudi.table.storage.HoodieStorageLayout;
import org.apache.avro.Schema;
-import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -774,7 +773,9 @@ public abstract class HoodieTable<T, I, K, O> implements
Serializable {
if (!invalidDataPaths.isEmpty()) {
LOG.info("Removing duplicate files created due to task retries before
committing. Paths=" + invalidDataPaths);
Map<String, List<Pair<String, String>>> invalidPathsByPartition =
invalidDataPaths.stream()
- .map(dp -> Pair.of(new Path(basePath, dp).getParent().toString(),
new Path(basePath, dp).toString()))
+ .map(dp ->
+ Pair.of(new StoragePath(basePath, dp).getParent().toString(),
+ new StoragePath(basePath, dp).toString()))
.collect(Collectors.groupingBy(Pair::getKey));
// Ensure all files in delete list is actually present. This is
mandatory for an eventually consistent FS.
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapUtils.java
index 4b12e72938d..8a73136d7e6 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapUtils.java
@@ -20,17 +20,16 @@ package org.apache.hudi.table.action.bootstrap;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.HoodieStorageUtils;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathFilter;
+import org.apache.hudi.storage.StoragePathInfo;
import java.io.IOException;
import java.util.ArrayList;
@@ -45,53 +44,52 @@ public class BootstrapUtils {
/**
* Returns leaf folders with files under a path.
* @param baseFileFormat Hoodie base file format
- * @param fs File System
+ * @param storage Hoodie Storage
* @param context JHoodieEngineContext
* @return list of partition paths with files under them.
* @throws IOException
*/
- public static List<Pair<String, List<HoodieFileStatus>>>
getAllLeafFoldersWithFiles(HoodieFileFormat baseFileFormat,
-
FileSystem fs, String basePathStr, HoodieEngineContext context) throws
IOException {
- final Path basePath = new Path(basePathStr);
+ public static List<Pair<String, List<HoodieFileStatus>>>
getAllLeafFoldersWithFiles(
+ HoodieFileFormat baseFileFormat,
+ HoodieStorage storage,
+ String basePathStr,
+ HoodieEngineContext context) throws IOException {
+ final StoragePath basePath = new StoragePath(basePathStr);
final String baseFileExtension = baseFileFormat.getFileExtension();
final Map<Integer, List<String>> levelToPartitions = new HashMap<>();
final Map<String, List<HoodieFileStatus>> partitionToFiles = new
HashMap<>();
- PathFilter filePathFilter = getFilePathFilter(baseFileExtension);
- PathFilter metaPathFilter = getExcludeMetaPathFilter();
+ StoragePathFilter filePathFilter = getFilePathFilter(baseFileExtension);
+ StoragePathFilter metaPathFilter = getExcludeMetaPathFilter();
- FileStatus[] topLevelStatuses = fs.listStatus(basePath);
+ List<StoragePathInfo> topLevelPathInfos =
storage.listDirectEntries(basePath);
List<String> subDirectories = new ArrayList<>();
List<Pair<HoodieFileStatus, Pair<Integer, String>>> result = new
ArrayList<>();
- for (FileStatus topLevelStatus: topLevelStatuses) {
- if (topLevelStatus.isFile() &&
filePathFilter.accept(topLevelStatus.getPath())) {
- String relativePath = HadoopFSUtils.getRelativePartitionPath(basePath,
topLevelStatus.getPath().getParent());
+ for (StoragePathInfo topLevelPathInfo: topLevelPathInfos) {
+ if (topLevelPathInfo.isFile() &&
filePathFilter.accept(topLevelPathInfo.getPath())) {
+ String relativePath = FSUtils.getRelativePartitionPath(basePath,
topLevelPathInfo.getPath().getParent());
Integer level = (int) relativePath.chars().filter(ch -> ch ==
'/').count();
- HoodieFileStatus hoodieFileStatus =
HadoopFSUtils.fromFileStatus(topLevelStatus);
+ HoodieFileStatus hoodieFileStatus =
FSUtils.fromPathInfo(topLevelPathInfo);
result.add(Pair.of(hoodieFileStatus, Pair.of(level, relativePath)));
- } else if (topLevelStatus.isDirectory() &&
metaPathFilter.accept(topLevelStatus.getPath())) {
- subDirectories.add(topLevelStatus.getPath().toString());
+ } else if (topLevelPathInfo.isDirectory() &&
metaPathFilter.accept(topLevelPathInfo.getPath())) {
+ subDirectories.add(topLevelPathInfo.getPath().toString());
}
}
- if (subDirectories.size() > 0) {
+ if (!subDirectories.isEmpty()) {
result.addAll(context.flatMap(subDirectories, directory -> {
- PathFilter pathFilter = getFilePathFilter(baseFileExtension);
- Path path = new Path(directory);
- FileSystem fileSystem = HadoopFSUtils.getFs(path,
HadoopFSUtils.getStorageConf());
- RemoteIterator<LocatedFileStatus> itr = fileSystem.listFiles(path,
true);
- List<Pair<HoodieFileStatus, Pair<Integer, String>>> res = new
ArrayList<>();
- while (itr.hasNext()) {
- FileStatus status = itr.next();
- if (pathFilter.accept(status.getPath())) {
- String relativePath = HadoopFSUtils.getRelativePartitionPath(new
Path(basePathStr), status.getPath().getParent());
- Integer level = (int) relativePath.chars().filter(ch -> ch ==
'/').count();
- HoodieFileStatus hoodieFileStatus =
HadoopFSUtils.fromFileStatus(status);
- res.add(Pair.of(hoodieFileStatus, Pair.of(level, relativePath)));
- }
- }
- return res.stream();
+ StoragePathFilter pathFilter = getFilePathFilter(baseFileExtension);
+ StoragePath path = new StoragePath(directory);
+ HoodieStorage tmpStorage = HoodieStorageUtils.getStorage(path,
HadoopFSUtils.getStorageConf());
+ return tmpStorage.listFiles(path).stream()
+ .filter(pathInfo -> pathFilter.accept(pathInfo.getPath()))
+ .map(pathInfo -> {
+ String relativePath = FSUtils.getRelativePartitionPath(basePath,
pathInfo.getPath().getParent());
+ Integer level = (int) relativePath.chars().filter(ch -> ch ==
'/').count();
+ HoodieFileStatus hoodieFileStatus =
FSUtils.fromPathInfo(pathInfo);
+ return Pair.of(hoodieFileStatus, Pair.of(level, relativePath));
+ });
}, subDirectories.size()));
}
@@ -118,14 +116,12 @@ public class BootstrapUtils {
.map(d -> Pair.of(d,
partitionToFiles.get(d))).collect(Collectors.toList()) : new ArrayList<>();
}
- private static PathFilter getFilePathFilter(String baseFileExtension) {
- return (path) -> {
- return path.getName().endsWith(baseFileExtension);
- };
+ private static StoragePathFilter getFilePathFilter(String baseFileExtension)
{
+ return path -> path.getName().endsWith(baseFileExtension);
}
- private static PathFilter getExcludeMetaPathFilter() {
+ private static StoragePathFilter getExcludeMetaPathFilter() {
// Avoid listing and including any folders under the meta folder
- return (path) ->
!path.toString().contains(HoodieTableMetaClient.METAFOLDER_NAME);
+ return path ->
!path.toString().contains(HoodieTableMetaClient.METAFOLDER_NAME);
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
index a0f3fda6e0b..c44421e9ce2 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
@@ -36,11 +36,11 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import
org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.BaseActionExecutor;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,16 +73,17 @@ public class CleanActionExecutor<T, I, K, O> extends
BaseActionExecutor<T, I, K,
this.skipLocking = skipLocking;
}
- private static Boolean deleteFileAndGetResult(FileSystem fs, String
deletePathStr) throws IOException {
- Path deletePath = new Path(deletePathStr);
+ private static Boolean deleteFileAndGetResult(HoodieStorage storage, String
deletePathStr) throws IOException {
+ StoragePath deletePath = new StoragePath(deletePathStr);
LOG.debug("Working on delete path :" + deletePath);
try {
- boolean isDirectory = fs.isDirectory(deletePath);
- boolean deleteResult = fs.delete(deletePath, isDirectory);
+ boolean deleteResult = storage.getPathInfo(deletePath).isDirectory()
+ ? storage.deleteDirectory(deletePath)
+ : storage.deleteFile(deletePath);
if (deleteResult) {
LOG.debug("Cleaned file at path :" + deletePath);
} else {
- if (fs.exists(deletePath)) {
+ if (storage.exists(deletePath)) {
throw new HoodieIOException("Failed to delete path during clean
execution " + deletePath);
} else {
LOG.debug("Already cleaned up file at path :" + deletePath);
@@ -97,16 +98,15 @@ public class CleanActionExecutor<T, I, K, O> extends
BaseActionExecutor<T, I, K,
private static Stream<Pair<String, PartitionCleanStat>>
deleteFilesFunc(Iterator<Pair<String, CleanFileInfo>> cleanFileInfo,
HoodieTable table) {
Map<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<>();
- FileSystem fs = (FileSystem) table.getStorage().getFileSystem();
+ HoodieStorage storage = table.getStorage();
cleanFileInfo.forEachRemaining(partitionDelFileTuple -> {
String partitionPath = partitionDelFileTuple.getLeft();
- Path deletePath = new
Path(partitionDelFileTuple.getRight().getFilePath());
+ StoragePath deletePath = new
StoragePath(partitionDelFileTuple.getRight().getFilePath());
String deletePathStr = deletePath.toString();
boolean deletedFileResult = false;
try {
- deletedFileResult = deleteFileAndGetResult(fs, deletePathStr);
-
+ deletedFileResult = deleteFileAndGetResult(storage, deletePathStr);
} catch (IOException e) {
LOG.error("Delete file failed: " + deletePathStr, e);
}
@@ -158,8 +158,7 @@ public class CleanActionExecutor<T, I, K, O> extends
BaseActionExecutor<T, I, K,
partitionsToBeDeleted.forEach(entry -> {
try {
if (!isNullOrEmpty(entry)) {
- deleteFileAndGetResult((FileSystem)
table.getStorage().getFileSystem(),
- table.getMetaClient().getBasePath() + "/" + entry);
+ deleteFileAndGetResult(table.getStorage(),
table.getMetaClient().getBasePath() + "/" + entry);
}
} catch (IOException e) {
LOG.warn("Partition deletion failed " + entry);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java
index 2e4e39f26b7..9d2f727aeb9 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java
@@ -33,12 +33,9 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieRollbackException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -190,12 +187,12 @@ public class BaseRollbackHelper implements Serializable {
return filesToBeDeleted.stream().map(fileToDelete -> {
String basePath = metaClient.getBasePath().toString();
try {
- Path fullDeletePath = new Path(fileToDelete);
- String partitionPath = HadoopFSUtils.getRelativePartitionPath(new
Path(basePath), fullDeletePath.getParent());
+ StoragePath fullDeletePath = new StoragePath(fileToDelete);
+ String partitionPath = FSUtils.getRelativePartitionPath(new
StoragePath(basePath), fullDeletePath.getParent());
boolean isDeleted = true;
if (doDelete) {
try {
- isDeleted = ((FileSystem)
metaClient.getStorage().getFileSystem()).delete(fullDeletePath);
+ isDeleted = metaClient.getStorage().deleteFile(fullDeletePath);
} catch (FileNotFoundException e) {
// if first rollback attempt failed and retried again, chances
that some files are already deleted.
isDeleted = true;
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
index af3a88643a2..b2100867694 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
@@ -32,28 +32,26 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieRollbackException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathFilter;
+import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.table.HoodieTable;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static
org.apache.hudi.common.table.timeline.MetadataConversionUtils.getHoodieCommitMetadata;
-import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
import static
org.apache.hudi.table.action.rollback.BaseRollbackHelper.EMPTY_STRING;
/**
@@ -111,10 +109,10 @@ public class ListingBasedRollbackStrategy implements
BaseRollbackPlanActionExecu
return context.flatMap(partitionPaths, partitionPath -> {
List<HoodieRollbackRequest> hoodieRollbackRequests = new
ArrayList<>(partitionPaths.size());
- Supplier<FileStatus[]> filesToDelete = () -> {
+ Supplier<List<StoragePathInfo>> filesToDelete = () -> {
try {
return fetchFilesFromInstant(instantToRollback, partitionPath,
metaClient.getBasePath().toString(), baseFileExtension,
- (FileSystem) metaClient.getStorage().getFileSystem(),
+ metaClient.getStorage(),
commitMetadataOptional, isCommitMetadataCompleted, tableType);
} catch (IOException e) {
throw new HoodieIOException("Fetching files to delete error", e);
@@ -149,8 +147,7 @@ public class ListingBasedRollbackStrategy implements
BaseRollbackPlanActionExecu
// and not corresponding base commit log files created with
this as baseCommit since updates would
// have been written to the log files.
hoodieRollbackRequests.addAll(getHoodieRollbackRequests(partitionPath,
- listBaseFilesToBeDeleted(instantToRollback.getTimestamp(),
baseFileExtension, partitionPath,
- (FileSystem)
metaClient.getStorage().getFileSystem())));
+ listBaseFilesToBeDeleted(instantToRollback.getTimestamp(),
baseFileExtension, partitionPath, metaClient.getStorage())));
} else {
// if this is part of a restore operation, we should
rollback/delete entire file slice.
hoodieRollbackRequests.addAll(getHoodieRollbackRequests(partitionPath,
@@ -182,34 +179,32 @@ public class ListingBasedRollbackStrategy implements
BaseRollbackPlanActionExecu
}
}
- private FileStatus[] listAllFilesSinceCommit(
- String commit,
- String baseFileExtension,
- String partitionPath,
- HoodieTableMetaClient metaClient) throws IOException {
+ private List<StoragePathInfo> listAllFilesSinceCommit(String commit,
+ String
baseFileExtension,
+ String partitionPath,
+ HoodieTableMetaClient
metaClient) throws IOException {
LOG.info("Collecting files to be cleaned/rolledback up for path " +
partitionPath + " and commit " + commit);
CompletionTimeQueryView completionTimeQueryView = new
CompletionTimeQueryView(metaClient);
- PathFilter filter = (path) -> {
+ StoragePathFilter filter = (path) -> {
if (path.toString().contains(baseFileExtension)) {
String fileCommitTime = FSUtils.getCommitTime(path.getName());
return HoodieTimeline.compareTimestamps(commit,
HoodieTimeline.LESSER_THAN_OR_EQUALS,
fileCommitTime);
- } else if (HadoopFSUtils.isLogFile(path)) {
- String fileCommitTime =
FSUtils.getDeltaCommitTimeFromLogPath(convertToStoragePath(path));
+ } else if (FSUtils.isLogFile(path)) {
+ String fileCommitTime = FSUtils.getDeltaCommitTimeFromLogPath(path);
return completionTimeQueryView.isSlicedAfterOrOn(commit,
fileCommitTime);
}
return false;
};
- return ((FileSystem) metaClient.getStorage().getFileSystem())
-
.listStatus(HadoopFSUtils.constructAbsolutePathInHadoopPath(config.getBasePath(),
partitionPath),
- filter);
+ return metaClient.getStorage()
+ .listDirectEntries(FSUtils.constructAbsolutePath(config.getBasePath(),
partitionPath), filter);
}
@NotNull
- private List<HoodieRollbackRequest> getHoodieRollbackRequests(String
partitionPath, FileStatus[] filesToDeletedStatus) {
- return Arrays.stream(filesToDeletedStatus)
- .map(fileStatus -> {
- String dataFileToBeDeleted = fileStatus.getPath().toString();
+ private List<HoodieRollbackRequest> getHoodieRollbackRequests(String
partitionPath, List<StoragePathInfo> filesToDeletedStatus) {
+ return filesToDeletedStatus.stream()
+ .map(pathInfo -> {
+ String dataFileToBeDeleted = pathInfo.getPath().toString();
return formatDeletePath(dataFileToBeDeleted);
})
.map(s -> new HoodieRollbackRequest(partitionPath, EMPTY_STRING,
EMPTY_STRING, Collections.singletonList(s), Collections.emptyMap()))
@@ -221,49 +216,56 @@ public class ListingBasedRollbackStrategy implements
BaseRollbackPlanActionExecu
return path.substring(path.indexOf(":") + 1);
}
- private FileStatus[] listBaseFilesToBeDeleted(String commit, String
basefileExtension, String partitionPath,
- FileSystem fs) throws
IOException {
+ private List<StoragePathInfo> listBaseFilesToBeDeleted(String commit,
+ String
basefileExtension,
+ String partitionPath,
+ HoodieStorage
storage) throws IOException {
LOG.info("Collecting files to be cleaned/rolledback up for path " +
partitionPath + " and commit " + commit);
- PathFilter filter = (path) -> {
+ StoragePathFilter filter = (path) -> {
if (path.toString().contains(basefileExtension)) {
String fileCommitTime = FSUtils.getCommitTime(path.getName());
return commit.equals(fileCommitTime);
}
return false;
};
- return
fs.listStatus(HadoopFSUtils.constructAbsolutePathInHadoopPath(config.getBasePath(),
partitionPath), filter);
+ return
storage.listDirectEntries(FSUtils.constructAbsolutePath(config.getBasePath(),
partitionPath), filter);
}
- private FileStatus[] fetchFilesFromInstant(HoodieInstant instantToRollback,
String partitionPath, String basePath,
- String baseFileExtension,
FileSystem fs,
- Option<HoodieCommitMetadata>
commitMetadataOptional,
- Boolean isCommitMetadataCompleted,
- HoodieTableType tableType) throws
IOException {
+ private List<StoragePathInfo> fetchFilesFromInstant(HoodieInstant
instantToRollback,
+ String partitionPath,
String basePath,
+ String
baseFileExtension, HoodieStorage storage,
+
Option<HoodieCommitMetadata> commitMetadataOptional,
+ Boolean
isCommitMetadataCompleted,
+ HoodieTableType
tableType) throws IOException {
// go w/ commit metadata only for COW table. for MOR, we need to get
associated log files when commit corresponding to base file is rolledback.
if (isCommitMetadataCompleted && tableType ==
HoodieTableType.COPY_ON_WRITE) {
return fetchFilesFromCommitMetadata(instantToRollback, partitionPath,
basePath, commitMetadataOptional.get(),
- baseFileExtension, fs);
+ baseFileExtension, storage);
} else {
- return fetchFilesFromListFiles(instantToRollback, partitionPath,
basePath, baseFileExtension, fs);
+ return fetchFilesFromListFiles(instantToRollback, partitionPath,
basePath, baseFileExtension, storage);
}
}
- private FileStatus[] fetchFilesFromCommitMetadata(HoodieInstant
instantToRollback, String partitionPath,
- String basePath,
HoodieCommitMetadata commitMetadata,
- String baseFileExtension,
FileSystem fs)
- throws IOException {
- SerializablePathFilter pathFilter =
getSerializablePathFilter(baseFileExtension, instantToRollback.getTimestamp());
- Path[] filePaths = getFilesFromCommitMetadata(basePath, commitMetadata,
partitionPath);
-
- return fs.listStatus(Arrays.stream(filePaths).filter(entry -> {
- try {
- return fs.exists(entry);
- } catch (Exception e) {
- LOG.error("Exists check failed for " + entry.toString(), e);
- }
- // if any Exception is thrown, do not ignore. let's try to add the file
of interest to be deleted. we can't miss any files to be rolled back.
- return true;
- }).toArray(Path[]::new), pathFilter);
+ private List<StoragePathInfo> fetchFilesFromCommitMetadata(HoodieInstant
instantToRollback,
+ String
partitionPath,
+ String basePath,
+
HoodieCommitMetadata commitMetadata,
+ String
baseFileExtension,
+ HoodieStorage
storage) throws IOException {
+ StoragePathFilter pathFilter = getPathFilter(baseFileExtension,
+ instantToRollback.getTimestamp());
+ List<StoragePath> filePaths = getFilesFromCommitMetadata(basePath,
commitMetadata, partitionPath)
+ .filter(entry -> {
+ try {
+ return storage.exists(entry);
+ } catch (Exception e) {
+ LOG.error("Exists check failed for " + entry.toString(), e);
+ }
+ // if any Exception is thrown, do not ignore. let's try to add the
file of interest to be deleted. we can't miss any files to be rolled back.
+ return true;
+ }).collect(Collectors.toList());
+
+ return storage.listDirectEntries(filePaths, pathFilter);
}
/**
@@ -272,17 +274,19 @@ public class ListingBasedRollbackStrategy implements
BaseRollbackPlanActionExecu
* @param partitionPath
* @param basePath
* @param baseFileExtension
- * @param fs
+ * @param storage
* @return
* @throws IOException
*/
- private FileStatus[] fetchFilesFromListFiles(HoodieInstant
instantToRollback, String partitionPath, String basePath,
- String baseFileExtension,
FileSystem fs)
- throws IOException {
- SerializablePathFilter pathFilter =
getSerializablePathFilter(baseFileExtension, instantToRollback.getTimestamp());
- Path[] filePaths = listFilesToBeDeleted(basePath, partitionPath);
-
- return fs.listStatus(filePaths, pathFilter);
+ private List<StoragePathInfo> fetchFilesFromListFiles(HoodieInstant
instantToRollback,
+ String partitionPath,
+ String basePath,
+ String
baseFileExtension,
+ HoodieStorage storage)
throws IOException {
+ StoragePathFilter pathFilter = getPathFilter(baseFileExtension,
instantToRollback.getTimestamp());
+ List<StoragePath> filePaths = listFilesToBeDeleted(basePath,
partitionPath);
+
+ return storage.listDirectEntries(filePaths, pathFilter);
}
private Boolean checkCommitMetadataCompleted(HoodieInstant instantToRollback,
@@ -291,24 +295,26 @@ public class ListingBasedRollbackStrategy implements
BaseRollbackPlanActionExecu
&&
!WriteOperationType.UNKNOWN.equals(commitMetadataOptional.get().getOperationType());
}
- private static Path[] listFilesToBeDeleted(String basePath, String
partitionPath) {
- return new Path[]
{HadoopFSUtils.constructAbsolutePathInHadoopPath(basePath, partitionPath)};
+ private static List<StoragePath> listFilesToBeDeleted(String basePath,
String partitionPath) {
+ return Collections.singletonList(FSUtils.constructAbsolutePath(basePath,
partitionPath));
}
- private static Path[] getFilesFromCommitMetadata(String basePath,
HoodieCommitMetadata commitMetadata, String partitionPath) {
+ private static Stream<StoragePath> getFilesFromCommitMetadata(String
basePath,
+
HoodieCommitMetadata commitMetadata,
+ String
partitionPath) {
List<String> fullPaths =
commitMetadata.getFullPathsByPartitionPath(basePath, partitionPath);
- return fullPaths.stream().map(Path::new).toArray(Path[]::new);
+ return fullPaths.stream().map(StoragePath::new);
}
@NotNull
- private static SerializablePathFilter getSerializablePathFilter(String
basefileExtension, String commit) {
+ private static StoragePathFilter getPathFilter(String basefileExtension,
String commit) {
return (path) -> {
if (path.toString().endsWith(basefileExtension)) {
String fileCommitTime = FSUtils.getCommitTime(path.getName());
return commit.equals(fileCommitTime);
- } else if (HadoopFSUtils.isLogFile(path)) {
+ } else if (FSUtils.isLogFile(path)) {
// Since the baseCommitTime is the only commit for new log files, it's
okay here
- String fileCommitTime =
FSUtils.getDeltaCommitTimeFromLogPath(convertToStoragePath(path));
+ String fileCommitTime = FSUtils.getDeltaCommitTimeFromLogPath(path);
return commit.equals(fileCommitTime);
}
return false;
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
index 84fe13c9a39..8a63d6865ac 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
@@ -20,16 +20,16 @@ package org.apache.hudi.table.action.rollback;
import org.apache.hudi.avro.model.HoodieRollbackRequest;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieRollbackException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.MarkerBasedRollbackUtils;
import org.apache.hudi.table.marker.WriteMarkers;
-import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,9 +71,9 @@ public class MarkerBasedRollbackStrategy<T, I, K, O>
implements BaseRollbackPlan
String typeStr =
markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
IOType type = IOType.valueOf(typeStr);
String filePathStr = WriteMarkers.stripMarkerSuffix(markerFilePath);
- Path filePath = new Path(basePath, filePathStr);
- String partitionPath = HadoopFSUtils.getRelativePartitionPath(new
Path(basePath), filePath.getParent());
- String fileId = HadoopFSUtils.getFileIdFromFilePath(filePath);
+ StoragePath filePath = new StoragePath(basePath, filePathStr);
+ String partitionPath = FSUtils.getRelativePartitionPath(new
StoragePath(basePath), filePath.getParent());
+ String fileId = FSUtils.getFileIdFromFilePath(filePath);
switch (type) {
case MERGE:
case CREATE:
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/SerializablePathFilter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/SerializablePathFilter.java
deleted file mode 100644
index e2affdf5ca8..00000000000
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/SerializablePathFilter.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.rollback;
-
-import org.apache.hadoop.fs.PathFilter;
-
-import java.io.Serializable;
-
-public interface SerializablePathFilter extends PathFilter, Serializable {
-}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java
index f7ed9370a95..86a0c6f0aea 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java
@@ -32,18 +32,13 @@ import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.table.HoodieTable;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -111,23 +106,18 @@ public class DirectWriteMarkers extends WriteMarkers {
}
}
- if (subDirectories.size() > 0) {
+ if (!subDirectories.isEmpty()) {
parallelism = Math.min(subDirectories.size(), parallelism);
StorageConfiguration<?> storageConf = storage.getConf();
context.setJobStatus(this.getClass().getSimpleName(), "Obtaining marker
files for all created, merged paths");
dataFiles.addAll(context.flatMap(subDirectories, directory -> {
- Path path = new Path(directory);
- FileSystem fileSystem = HadoopFSUtils.getFs(path, storageConf);
- RemoteIterator<LocatedFileStatus> itr = fileSystem.listFiles(path,
true);
- List<String> result = new ArrayList<>();
- while (itr.hasNext()) {
- FileStatus status = itr.next();
- String pathStr = status.getPath().toString();
- if (pathStr.contains(HoodieTableMetaClient.MARKER_EXTN) &&
!pathStr.endsWith(IOType.APPEND.name())) {
- result.add(translateMarkerToDataPath(pathStr));
- }
- }
- return result.stream();
+ StoragePath path = new StoragePath(directory);
+ HoodieStorage storage = HoodieStorageUtils.getStorage(path,
storageConf);
+ return storage.listFiles(path).stream()
+ .map(pathInfo -> pathInfo.getPath().toString())
+ .filter(pathStr ->
pathStr.contains(HoodieTableMetaClient.MARKER_EXTN)
+ && !pathStr.endsWith(IOType.APPEND.name()))
+ .map(this::translateMarkerToDataPath);
}, parallelism));
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/SimpleTransactionDirectMarkerBasedDetectionStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/SimpleTransactionDirectMarkerBasedDetectionStrategy.java
index 3d984ba781c..d09cc74f14e 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/SimpleTransactionDirectMarkerBasedDetectionStrategy.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/SimpleTransactionDirectMarkerBasedDetectionStrategy.java
@@ -24,7 +24,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieEarlyConflictDetectionException;
import org.apache.hudi.storage.HoodieStorage;
-import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,8 +47,7 @@ public class
SimpleTransactionDirectMarkerBasedDetectionStrategy
@Override
public void detectAndResolveConflictIfNecessary() throws
HoodieEarlyConflictDetectionException {
DirectMarkerTransactionManager txnManager =
- new DirectMarkerTransactionManager((HoodieWriteConfig) config,
- (FileSystem) storage.getFileSystem(), partitionPath, fileId);
+ new DirectMarkerTransactionManager((HoodieWriteConfig) config,
storage, partitionPath, fileId);
try {
// Need to do transaction before create marker file when using early
conflict detection
txnManager.beginTransaction(instantTime);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/repair/RepairUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/repair/RepairUtils.java
index 17def321f4a..f4eb9ef31cf 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/repair/RepairUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/repair/RepairUtils.java
@@ -31,10 +31,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.storage.StoragePath;
-import org.apache.hadoop.fs.PathFilter;
-
import java.io.IOException;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -168,10 +165,4 @@ public final class RepairUtils {
return Collections.emptyList();
}
}
-
- /**
- * Serializable path filter class for Spark job.
- */
- public interface SerializablePathFilter extends PathFilter, Serializable {
- }
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FourToFiveUpgradeHandler.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FourToFiveUpgradeHandler.java
index 592563b4991..697713dbceb 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FourToFiveUpgradeHandler.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FourToFiveUpgradeHandler.java
@@ -24,11 +24,9 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,7 +74,7 @@ public class FourToFiveUpgradeHandler implements
UpgradeHandler {
}
}
- private boolean hasDefaultPartitionPath(HoodieWriteConfig config,
HoodieTable table) throws IOException {
+ private boolean hasDefaultPartitionPath(HoodieWriteConfig config,
HoodieTable table) throws IOException {
HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
if (!tableConfig.isTablePartitioned()) {
return false;
@@ -88,7 +86,7 @@ public class FourToFiveUpgradeHandler implements
UpgradeHandler {
String[] partitions = tableConfig.getPartitionFields().get();
checkPartitionPath = partitions[0] + "=" +
DEPRECATED_DEFAULT_PARTITION_PATH;
}
- FileSystem fs = HadoopFSUtils.getFs(config.getBasePath(),
table.getStorageConf());
- return fs.exists(new Path(config.getBasePath() + "/" +
checkPartitionPath));
+
+ return table.getStorage().exists(new StoragePath(config.getBasePath() +
"/" + checkPartitionPath));
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
index 096c4e8135f..2d4e8abf89c 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
@@ -30,7 +30,6 @@ import org.apache.hudi.metadata.HoodieMetadataWriteUtils;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.storage.StoragePath;
-import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,8 +48,8 @@ public class UpgradeDowngrade {
private HoodieTableMetaClient metaClient;
protected HoodieWriteConfig config;
protected HoodieEngineContext context;
- private Path updatedPropsFilePath;
- private Path propsFilePath;
+ private StoragePath updatedPropsFilePath;
+ private StoragePath propsFilePath;
public UpgradeDowngrade(
HoodieTableMetaClient metaClient, HoodieWriteConfig config,
HoodieEngineContext context,
@@ -58,8 +57,8 @@ public class UpgradeDowngrade {
this.metaClient = metaClient;
this.config = config;
this.context = context;
- this.updatedPropsFilePath = new Path(metaClient.getMetaPath().toString(),
HOODIE_UPDATED_PROPERTY_FILE);
- this.propsFilePath = new Path(metaClient.getMetaPath().toString(),
HoodieTableConfig.HOODIE_PROPERTIES_FILE);
+ this.updatedPropsFilePath = new StoragePath(metaClient.getMetaPath(),
HOODIE_UPDATED_PROPERTY_FILE);
+ this.propsFilePath = new StoragePath(metaClient.getMetaPath(),
HoodieTableConfig.HOODIE_PROPERTIES_FILE);
this.upgradeDowngradeHelper = upgradeDowngradeHelper;
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/HoodieTestCommitGenerator.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/HoodieTestCommitGenerator.java
index 0356639bdeb..631aeebd58e 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/HoodieTestCommitGenerator.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/HoodieTestCommitGenerator.java
@@ -32,11 +32,11 @@ import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -154,7 +154,7 @@ public class HoodieTestCommitGenerator {
fileInfoList.forEach(fileInfo -> {
HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setPartitionPath(partitionPath);
- writeStat.setPath(new Path(partitionPath,
fileInfo.getValue()).toString());
+ writeStat.setPath(new StoragePath(partitionPath,
fileInfo.getValue()).toString());
writeStat.setFileId(fileInfo.getKey());
// Below are dummy values
writeStat.setTotalWriteBytes(10000);
@@ -171,8 +171,9 @@ public class HoodieTestCommitGenerator {
public static void createCommitFileWithMetadata(
String basePath, StorageConfiguration<?> storageConf,
String filename, byte[] content) throws IOException {
- Path commitFilePath = new Path(basePath + "/" +
HoodieTableMetaClient.METAFOLDER_NAME + "/" + filename);
- try (OutputStream os = HadoopFSUtils.getFs(basePath,
storageConf).create(commitFilePath, true)) {
+ HoodieStorage storage = HoodieStorageUtils.getStorage(basePath,
storageConf);
+ StoragePath commitFilePath = new StoragePath(basePath + "/" +
HoodieTableMetaClient.METAFOLDER_NAME + "/" + filename);
+ try (OutputStream os = storage.create(commitFilePath, true)) {
os.write(content);
}
}
@@ -180,14 +181,14 @@ public class HoodieTestCommitGenerator {
public static void createDataFile(
String basePath, StorageConfiguration<?> storageConf,
String partitionPath, String filename) throws IOException {
- FileSystem fs = HadoopFSUtils.getFs(basePath, storageConf);
- Path filePath = new Path(new Path(basePath, partitionPath), filename);
- Path parent = filePath.getParent();
- if (!fs.exists(parent)) {
- fs.mkdirs(parent);
+ HoodieStorage storage = HoodieStorageUtils.getStorage(basePath,
storageConf);
+ StoragePath filePath = new StoragePath(new StoragePath(basePath,
partitionPath), filename);
+ StoragePath parent = filePath.getParent();
+ if (!storage.exists(parent)) {
+ storage.createDirectory(parent);
}
- if (!fs.exists(filePath)) {
- fs.create(filePath);
+ if (!storage.exists(filePath)) {
+ storage.create(filePath);
}
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/FileSystemBasedLockProviderTestClass.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/FileSystemBasedLockProviderTestClass.java
index 2df166c1c71..30d239677f9 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/FileSystemBasedLockProviderTestClass.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/FileSystemBasedLockProviderTestClass.java
@@ -22,15 +22,14 @@ import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.lock.LockProvider;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieLockException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StorageConfiguration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
import java.io.IOException;
import java.io.Serializable;
import java.util.concurrent.TimeUnit;
+import org.apache.hudi.storage.StoragePath;
import static
org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY;
import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY;
@@ -47,8 +46,8 @@ public class FileSystemBasedLockProviderTestClass implements
LockProvider<String
private final int retryMaxCount;
private final int retryWaitTimeMs;
- private transient FileSystem fs;
- private transient Path lockFile;
+ private transient HoodieStorage storage;
+ private transient StoragePath lockFile;
protected LockConfiguration lockConfiguration;
public FileSystemBasedLockProviderTestClass(final LockConfiguration
lockConfiguration, final StorageConfiguration<?> configuration) {
@@ -56,15 +55,15 @@ public class FileSystemBasedLockProviderTestClass
implements LockProvider<String
final String lockDirectory =
lockConfiguration.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP_KEY);
this.retryWaitTimeMs =
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY);
this.retryMaxCount =
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY);
- this.lockFile = new Path(lockDirectory + "/" + LOCK);
- this.fs = HadoopFSUtils.getFs(this.lockFile.toString(), configuration);
+ this.lockFile = new StoragePath(lockDirectory + "/" + LOCK);
+ this.storage = HoodieStorageUtils.getStorage(this.lockFile.toString(),
configuration);
}
@Override
public void close() {
synchronized (LOCK) {
try {
- fs.delete(this.lockFile, true);
+ storage.deleteDirectory(this.lockFile);
} catch (IOException e) {
throw new HoodieLockException("Unable to release lock: " + getLock(),
e);
}
@@ -76,7 +75,7 @@ public class FileSystemBasedLockProviderTestClass implements
LockProvider<String
try {
int numRetries = 0;
synchronized (LOCK) {
- while (fs.exists(this.lockFile)) {
+ while (storage.exists(this.lockFile)) {
LOCK.wait(retryWaitTimeMs);
numRetries++;
if (numRetries > retryMaxCount) {
@@ -84,7 +83,7 @@ public class FileSystemBasedLockProviderTestClass implements
LockProvider<String
}
}
acquireLock();
- return fs.exists(this.lockFile);
+ return storage.exists(this.lockFile);
}
} catch (IOException | InterruptedException e) {
throw new HoodieLockException("Failed to acquire lock: " + getLock(), e);
@@ -95,8 +94,8 @@ public class FileSystemBasedLockProviderTestClass implements
LockProvider<String
public void unlock() {
synchronized (LOCK) {
try {
- if (fs.exists(this.lockFile)) {
- fs.delete(this.lockFile, true);
+ if (storage.exists(this.lockFile)) {
+ storage.deleteDirectory(this.lockFile);
}
} catch (IOException io) {
throw new HoodieIOException("Unable to delete lock " + getLock() + "on
disk", io);
@@ -111,7 +110,7 @@ public class FileSystemBasedLockProviderTestClass
implements LockProvider<String
private void acquireLock() {
try {
- fs.create(this.lockFile, false).close();
+ storage.create(this.lockFile, false).close();
} catch (IOException e) {
throw new HoodieIOException("Failed to acquire lock: " + getLock(), e);
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestLockManager.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestLockManager.java
index 398ce60f811..68959518850 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestLockManager.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestLockManager.java
@@ -28,7 +28,6 @@ import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.curator.test.TestingServer;
-import org.apache.hadoop.fs.FileSystem;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
@@ -79,8 +78,7 @@ public class TestLockManager extends HoodieCommonTestHarness {
@ValueSource(booleans = {true, false})
void testLockAndUnlock(boolean multiWriter) {
HoodieWriteConfig writeConfig = multiWriter ? getMultiWriterWriteConfig()
: getSingleWriterWriteConfig();
- LockManager lockManager = new LockManager(writeConfig,
- (FileSystem) this.metaClient.getStorage().getFileSystem());
+ LockManager lockManager = new LockManager(writeConfig,
this.metaClient.getStorage());
LockManager mockLockManager = Mockito.spy(lockManager);
assertDoesNotThrow(() -> {
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/repair/TestRepairUtils.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/repair/TestRepairUtils.java
index 31f113d1890..12e660477ed 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/repair/TestRepairUtils.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/repair/TestRepairUtils.java
@@ -31,7 +31,6 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.storage.StoragePath;
-import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -133,7 +132,7 @@ public class TestRepairUtils {
Set<String> expectedPaths = partitionToFileIdAndNameMap.entrySet().stream()
.flatMap(entry ->
entry.getValue().stream()
- .map(fileInfo -> new Path(entry.getKey(),
fileInfo.getValue()).toString())
+ .map(fileInfo -> new StoragePath(entry.getKey(),
fileInfo.getValue()).toString())
.collect(Collectors.toList())
.stream()
).collect(Collectors.toSet());
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
index 809f01b5576..20c85a396c7 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
@@ -85,7 +85,6 @@ import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.testutils.MetadataMergeWriteStatus;
import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.fs.Path;
import org.jetbrains.annotations.NotNull;
import java.io.IOException;
@@ -690,7 +689,7 @@ public abstract class HoodieWriterClientTestHarness extends
HoodieCommonTestHarn
String markerName =
MarkerUtils.readTimelineServerBasedMarkersFromFileSystem(
markerFolderPath, storage, context, 1).values().stream()
.flatMap(Collection::stream).findFirst().get();
- partitionPath = new Path(markerFolderPath,
markerName).getParent().toString();
+ partitionPath = new StoragePath(markerFolderPath,
markerName).getParent().toString();
} else {
partitionPath = storage.globEntries(
new StoragePath(String.format("%s/*/*/*/*",
markerFolderPath)), path ->
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
index b1049e1d73c..c99e801b6ba 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
@@ -33,7 +33,6 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,7 +62,7 @@ public class FlinkMergeAndReplaceHandleWithChangeLog<T, I, K,
O>
config,
hoodieTable.getMetaClient().getTableConfig(),
partitionPath,
- (FileSystem) getStorage().getFileSystem(),
+ getStorage(),
getWriterSchema(),
createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX),
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
index 040c7d5b514..0483096ce9a 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
@@ -32,7 +32,6 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,7 +60,7 @@ public class FlinkMergeHandleWithChangeLog<T, I, K, O>
config,
hoodieTable.getMetaClient().getTableConfig(),
partitionPath,
- (FileSystem) getStorage().getFileSystem(),
+ getStorage(),
getWriterSchema(),
createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX),
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
index 5bbc744c32f..b19a202cc77 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
@@ -48,9 +48,10 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.keygen.KeyGeneratorInterface;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.BaseCommitActionExecutor;
@@ -58,7 +59,6 @@ import
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkBulkInsertCommitActionExecutor;
import org.apache.hudi.table.marker.WriteMarkersFactory;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.spark.api.java.JavaRDD;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,7 +83,7 @@ public class SparkBootstrapCommitActionExecutor<T>
private static final Logger LOG =
LoggerFactory.getLogger(SparkBootstrapCommitActionExecutor.class);
protected String bootstrapSchema = null;
- private transient FileSystem bootstrapSourceFileSystem;
+ private transient HoodieStorage bootstrapSourceStorage;
public SparkBootstrapCommitActionExecutor(HoodieSparkEngineContext context,
HoodieWriteConfig config,
@@ -100,7 +100,7 @@ public class SparkBootstrapCommitActionExecutor<T>
HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS,
WriteOperationType.BOOTSTRAP,
extraMetadata);
- bootstrapSourceFileSystem =
HadoopFSUtils.getFs(config.getBootstrapSourceBasePath(), storageConf);
+ bootstrapSourceStorage =
HoodieStorageUtils.getStorage(config.getBootstrapSourceBasePath(), storageConf);
}
private void validate() {
@@ -265,7 +265,7 @@ public class SparkBootstrapCommitActionExecutor<T>
*/
private Map<BootstrapMode, List<Pair<String, List<HoodieFileStatus>>>>
listAndProcessSourcePartitions() throws IOException {
List<Pair<String, List<HoodieFileStatus>>> folders =
BootstrapUtils.getAllLeafFoldersWithFiles(
- table.getBaseFileFormat(), bootstrapSourceFileSystem,
config.getBootstrapSourceBasePath(), context);
+ table.getBaseFileFormat(), bootstrapSourceStorage,
config.getBootstrapSourceBasePath(), context);
LOG.info("Fetching Bootstrap Schema !!");
HoodieBootstrapSchemaProvider sourceSchemaProvider = new
HoodieSparkBootstrapSchemaProvider(config);
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/bootstrap/TestBootstrapUtils.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/bootstrap/TestBootstrapUtils.java
index 97bcb8806b9..eca6cafe60a 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/bootstrap/TestBootstrapUtils.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/bootstrap/TestBootstrapUtils.java
@@ -24,7 +24,6 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.testutils.HoodieClientTestBase;
-import org.apache.hadoop.fs.FileSystem;
import org.junit.jupiter.api.Test;
import java.io.IOException;
@@ -72,7 +71,7 @@ public class TestBootstrapUtils extends HoodieClientTestBase {
List<Pair<String, List<HoodieFileStatus>>> collected =
BootstrapUtils.getAllLeafFoldersWithFiles(
metaClient.getTableConfig().getBaseFileFormat(),
- (FileSystem) metaClient.getStorage().getFileSystem(),
+ metaClient.getStorage(),
basePath, context);
assertEquals(3, collected.size());
collected.forEach(k -> assertEquals(2, k.getRight().size()));
@@ -81,7 +80,7 @@ public class TestBootstrapUtils extends HoodieClientTestBase {
collected =
BootstrapUtils.getAllLeafFoldersWithFiles(
metaClient.getTableConfig().getBaseFileFormat(),
- (FileSystem) metaClient.getStorage().getFileSystem(),
+ metaClient.getStorage(),
basePath + "/" + folders.get(0), context);
assertEquals(1, collected.size());
collected.forEach(k -> assertEquals(2, k.getRight().size()));
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanActionExecutor.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanActionExecutor.java
index 206e243ba17..38f7b60b5ee 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanActionExecutor.java
@@ -39,14 +39,13 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.clean.CleanActionExecutor;
import org.apache.hudi.table.action.clean.CleanPlanner;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
@@ -64,6 +63,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
/**
@@ -71,11 +71,11 @@ import static org.mockito.Mockito.when;
*/
public class TestCleanActionExecutor {
- private static final StorageConfiguration<Configuration> CONF =
getDefaultStorageConf();
+ private static final StorageConfiguration<?> CONF = getDefaultStorageConf();
private final HoodieEngineContext context = new
HoodieLocalEngineContext(CONF);
private final HoodieTable<?, ?, ?, ?> mockHoodieTable =
mock(HoodieTable.class);
private HoodieTableMetaClient metaClient;
- private FileSystem fs;
+ private HoodieStorage storage;
private static String PARTITION1 = "partition1";
@@ -88,12 +88,9 @@ public class TestCleanActionExecutor {
when(mockHoodieTable.getMetaClient()).thenReturn(metaClient);
HoodieTableConfig tableConfig = new HoodieTableConfig();
when(metaClient.getTableConfig()).thenReturn(tableConfig);
- HoodieStorage storage = mock(HoodieStorage.class);
+ storage = spy(HoodieStorageUtils.getStorage(CONF));
when(metaClient.getStorage()).thenReturn(storage);
when(mockHoodieTable.getStorage()).thenReturn(storage);
- fs = mock(FileSystem.class);
- when(storage.getFileSystem()).thenReturn(fs);
- when(fs.getConf()).thenReturn(CONF.unwrap());
}
@ParameterizedTest
@@ -102,23 +99,26 @@ public class TestCleanActionExecutor {
HoodieWriteConfig config = getCleanByCommitsConfig();
String fileGroup = UUID.randomUUID() + "-0";
HoodieBaseFile baseFile = new
HoodieBaseFile(String.format("/tmp/base/%s_1-0-1_%s.parquet", fileGroup,
"001"));
- FileSystem localFs = new
Path(baseFile.getPath()).getFileSystem(CONF.unwrap());
- Path filePath = new Path(baseFile.getPath());
- localFs.create(filePath);
+ HoodieStorage localStorage =
HoodieStorageUtils.getStorage(baseFile.getPath(), CONF);
+ StoragePath filePath = new StoragePath(baseFile.getPath());
+
if (failureType == CleanFailureType.TRUE_ON_DELETE) {
- when(fs.delete(filePath, false)).thenReturn(true);
+ when(storage.deleteFile(filePath)).thenReturn(true);
} else if (failureType ==
CleanFailureType.FALSE_ON_DELETE_IS_EXISTS_FALSE) {
- when(fs.delete(filePath, false)).thenReturn(false);
- when(fs.exists(filePath)).thenReturn(false);
+ when(storage.deleteFile(filePath)).thenReturn(false);
+ when(storage.exists(filePath)).thenReturn(false);
} else if (failureType == CleanFailureType.FALSE_ON_DELETE_IS_EXISTS_TRUE)
{
- when(fs.delete(filePath, false)).thenReturn(false);
- when(fs.exists(filePath)).thenReturn(true);
+ when(storage.deleteFile(filePath)).thenReturn(false);
+ when(storage.exists(filePath)).thenReturn(true);
} else if (failureType == CleanFailureType.FILE_NOT_FOUND_EXC_ON_DELETE) {
- when(fs.delete(filePath, false)).thenThrow(new
FileNotFoundException("throwing file not found exception"));
+ when(storage.deleteFile(filePath)).thenThrow(new
FileNotFoundException("throwing file not found exception"));
} else {
// run time exception
- when(fs.delete(filePath, false)).thenThrow(new
RuntimeException("throwing run time exception"));
+ when(storage.deleteFile(filePath)).thenThrow(new
RuntimeException("throwing run time exception"));
}
+ // we have to create the actual file after setting up mock logic for
storage
+ // otherwise the file created would be deleted when setting up because
storage is a spy
+ localStorage.create(filePath);
Map<String, List<HoodieCleanFileInfo>> partitionCleanFileInfoMap = new
HashMap<>();
List<HoodieCleanFileInfo> cleanFileInfos = Collections.singletonList(new
HoodieCleanFileInfo(baseFile.getPath(), false));
@@ -165,7 +165,7 @@ public class TestCleanActionExecutor {
});
}
- private void assertCleanExecutionSuccess(CleanActionExecutor
cleanActionExecutor, Path filePath) {
+ private void assertCleanExecutionSuccess(CleanActionExecutor
cleanActionExecutor, StoragePath filePath) {
HoodieCleanMetadata cleanMetadata = cleanActionExecutor.execute();
assertTrue(cleanMetadata.getPartitionMetadata().containsKey(PARTITION1));
HoodieCleanPartitionMetadata cleanPartitionMetadata =
cleanMetadata.getPartitionMetadata().get(PARTITION1);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index f4d29a138fe..65fb5073d7a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -19,6 +19,8 @@
package org.apache.hudi.common.fs;
+import org.apache.hudi.avro.model.HoodieFileStatus;
+import org.apache.hudi.avro.model.HoodiePath;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieFileFormat;
@@ -605,6 +607,31 @@ public class FSUtils {
return false;
}
+ public static HoodiePath fromStoragePath(StoragePath path) {
+ if (null == path) {
+ return null;
+ }
+ return HoodiePath.newBuilder().setUri(path.toString()).build();
+ }
+
+ public static HoodieFileStatus fromPathInfo(StoragePathInfo pathInfo) {
+ if (null == pathInfo) {
+ return null;
+ }
+
+ HoodieFileStatus fStatus = new HoodieFileStatus();
+
+ fStatus.setPath(fromStoragePath(pathInfo.getPath()));
+ fStatus.setLength(pathInfo.getLength());
+ fStatus.setIsDir(pathInfo.isDirectory());
+ fStatus.setBlockReplication((int) pathInfo.getBlockReplication());
+ fStatus.setBlockSize(pathInfo.getBlockSize());
+ fStatus.setModificationTime(pathInfo.getModificationTime());
+ fStatus.setAccessTime(pathInfo.getModificationTime());
+
+ return fStatus;
+ }
+
/**
* Processes sub-path in parallel.
*
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/SerializablePath.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/SerializablePath.java
deleted file mode 100644
index c814a3ed969..00000000000
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/SerializablePath.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hudi.hadoop.fs;
-
-import org.apache.hadoop.fs.Path;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.net.URI;
-import java.util.Objects;
-
-/**
- * {@link Serializable} wrapper encapsulating {@link Path}
- */
-public class SerializablePath implements Serializable {
-
- private Path path;
-
- public SerializablePath(Path path) {
- this.path = path;
- }
-
- public Path get() {
- return path;
- }
-
- private void writeObject(ObjectOutputStream out) throws IOException {
- out.writeObject(path.toUri());
- }
-
- private void readObject(ObjectInputStream in) throws IOException,
ClassNotFoundException {
- URI uri = (URI) in.readObject();
- path = new CachingPath(uri);
- }
-
- @Override
- public String toString() {
- return path.toString();
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- SerializablePath that = (SerializablePath) o;
- return Objects.equals(path, that.path);
- }
-}
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java
index 10826c7aace..e203f502a7d 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java
@@ -211,6 +211,18 @@ public class HoodieHadoopStorage extends HoodieStorage {
.collect(Collectors.toList());
}
+ @Override
+ public List<StoragePathInfo> listDirectEntries(List<StoragePath> pathList,
+ StoragePathFilter filter)
throws IOException {
+ return Arrays.stream(fs.listStatus(
+ pathList.stream()
+ .map(HadoopFSUtils::convertToHadoopPath)
+ .toArray(Path[]::new),
+ e -> filter.accept(convertToStoragePath(e))))
+ .map(HadoopFSUtils::convertToStoragePathInfo)
+ .collect(Collectors.toList());
+ }
+
@Override
public List<StoragePathInfo> globEntries(StoragePath pathPattern)
throws IOException {
diff --git a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
index fc5ae52de54..18c21d73b3b 100644
--- a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
+++ b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
@@ -426,6 +426,26 @@ public abstract class HoodieStorage implements Closeable {
return result;
}
+ /**
+ * Lists the file info of the direct files/directories in the given list of
paths
+ * and filters the results, if the paths are directory.
+ *
+ * @param pathList given path list.
+ * @param filter filter to apply.
+ * @return the list of path info of the files/directories in the given paths.
+ * @throws FileNotFoundException when the path does not exist.
+ * @throws IOException IO error.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public List<StoragePathInfo> listDirectEntries(List<StoragePath> pathList,
+ StoragePathFilter filter)
throws IOException {
+ List<StoragePathInfo> result = new ArrayList<>();
+ for (StoragePath path : pathList) {
+ result.addAll(listDirectEntries(path, filter));
+ }
+ return result;
+ }
+
/**
* Returns all the files that match the pathPattern and are not checksum
files.
*
diff --git
a/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageBase.java
b/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageBase.java
index c953fbab7a9..0f6fa99e284 100644
---
a/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageBase.java
+++
b/hudi-io/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageBase.java
@@ -270,6 +270,41 @@ public abstract class TestHoodieStorageBase {
e -> e.getParent().getName().equals("y") &&
e.getName().contains("1")));
}
+ @Test
+ public void testListingWithFilter() throws IOException {
+ HoodieStorage storage = getStorage();
+ // Full list:
+ // w/1.file
+ // w/2.file
+ // x/1.file
+ // x/2.file
+ // x/y/1.file
+ // x/y/2.file
+ // x/z/1.file
+ // x/z/2.file
+ prepareFilesOnStorage(storage);
+
+ validatePathInfoList(
+ Arrays.stream(new StoragePathInfo[] {
+ getStoragePathInfo("x/y/2.file", false)
+ }).collect(Collectors.toList()),
+ storage.listDirectEntries(
+ new StoragePath(getTempDir(), "x/y"),
+ path -> path.getName().contains("2")));
+
+ validatePathInfoList(
+ Arrays.stream(new StoragePathInfo[] {
+ getStoragePathInfo("w/2.file", false),
+ getStoragePathInfo("x/y/2.file", false),
+ getStoragePathInfo("x/z/2.file", false)
+ }).collect(Collectors.toList()),
+ storage.listDirectEntries(Arrays.stream(new StoragePath[] {
+ new StoragePath(getTempDir(), "w"),
+ new StoragePath(getTempDir(), "x/y"),
+ new StoragePath(getTempDir(), "x/z")
+ }).collect(Collectors.toList()), path ->
path.getName().equals("2.file")));
+ }
+
@Test
public void testFileNotFound() throws IOException {
HoodieStorage storage = getStorage();
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
index 99fcdcbf8a3..6969357763e 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
@@ -177,7 +177,7 @@ public class TestBootstrap extends
HoodieSparkClientTestBase {
String filePath =
HadoopFSUtils.toPath(
BootstrapUtils.getAllLeafFoldersWithFiles(getConfig().getBaseFileFormat(),
- (FileSystem) metaClient.getStorage().getFileSystem(),
+ metaClient.getStorage(),
srcPath, context).stream().findAny().map(p ->
p.getValue().stream().findAny())
.orElse(null).get().getPath()).toString();
HoodieAvroParquetReader parquetReader =
@@ -286,7 +286,7 @@ public class TestBootstrap extends
HoodieSparkClientTestBase {
metaClient.reloadActiveTimeline();
assertEquals(0, metaClient.getCommitsTimeline().countInstants());
assertEquals(0L,
BootstrapUtils.getAllLeafFoldersWithFiles(config.getBaseFileFormat(),
- (FileSystem) metaClient.getStorage().getFileSystem(), basePath,
context)
+ metaClient.getStorage(), basePath, context)
.stream().mapToLong(f -> f.getValue().size()).sum());
BootstrapIndex index = BootstrapIndex.getBootstrapIndex(metaClient);
@@ -314,7 +314,7 @@ public class TestBootstrap extends
HoodieSparkClientTestBase {
generateNewDataSetAndReturnSchema(updateTimestamp, totalRecords,
partitions, updateSPath);
JavaRDD<HoodieRecord> updateBatch =
generateInputBatch(jsc,
BootstrapUtils.getAllLeafFoldersWithFiles(config.getBaseFileFormat(),
- (FileSystem) metaClient.getStorage().getFileSystem(),
updateSPath, context),
+ metaClient.getStorage(), updateSPath, context),
schema);
String newInstantTs = client.startCommit();
client.upsert(updateBatch, newInstantTs);
@@ -389,7 +389,7 @@ public class TestBootstrap extends
HoodieSparkClientTestBase {
if (checkNumRawFiles) {
List<HoodieFileStatus> files =
BootstrapUtils.getAllLeafFoldersWithFiles(getConfig().getBaseFileFormat(),
- (FileSystem) metaClient.getStorage().getFileSystem(),
+ metaClient.getStorage(),
bootstrapBasePath, context).stream().flatMap(x ->
x.getValue().stream()).collect(Collectors.toList());
assertEquals(files.size() * numVersions,
sqlContext.sql("select distinct _hoodie_file_name from
bootstrapped").count());
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java
index f8c8527f413..613e620fc27 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java
@@ -60,7 +60,6 @@ import org.apache.hudi.testutils.HoodieSparkClientTestBase;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.orc.OrcFile;
@@ -157,7 +156,7 @@ public class TestOrcBootstrap extends
HoodieSparkClientTestBase {
}
String filePath =
HadoopFSUtils.toPath(BootstrapUtils.getAllLeafFoldersWithFiles(
metaClient.getTableConfig().getBaseFileFormat(),
- (FileSystem) metaClient.getStorage().getFileSystem(),
+ metaClient.getStorage(),
srcPath, context).stream().findAny().map(p ->
p.getValue().stream().findAny())
.orElse(null).get().getPath()).toString();
Reader orcReader =
@@ -271,7 +270,7 @@ public class TestOrcBootstrap extends
HoodieSparkClientTestBase {
assertEquals(0, metaClient.getCommitsTimeline().countInstants());
assertEquals(0L,
BootstrapUtils.getAllLeafFoldersWithFiles(metaClient.getTableConfig().getBaseFileFormat(),
- (FileSystem) metaClient.getStorage().getFileSystem(),
basePath, context)
+ metaClient.getStorage(), basePath, context)
.stream().flatMap(f -> f.getValue().stream()).count());
BootstrapIndex index = BootstrapIndex.getBootstrapIndex(metaClient);
@@ -300,7 +299,7 @@ public class TestOrcBootstrap extends
HoodieSparkClientTestBase {
JavaRDD<HoodieRecord> updateBatch =
generateInputBatch(jsc, BootstrapUtils.getAllLeafFoldersWithFiles(
metaClient.getTableConfig().getBaseFileFormat(),
- (FileSystem) metaClient.getStorage().getFileSystem(),
updateSPath, context),
+ metaClient.getStorage(), updateSPath, context),
schema);
String newInstantTs = client.startCommit();
client.upsert(updateBatch, newInstantTs);
@@ -374,8 +373,8 @@ public class TestOrcBootstrap extends
HoodieSparkClientTestBase {
if (checkNumRawFiles) {
List<HoodieFileStatus> files =
BootstrapUtils.getAllLeafFoldersWithFiles(metaClient.getTableConfig().getBaseFileFormat(),
- (FileSystem) metaClient.getStorage().getFileSystem(),
- bootstrapBasePath, context).stream().flatMap(x ->
x.getValue().stream())
+ metaClient.getStorage(), bootstrapBasePath, context)
+ .stream().flatMap(x -> x.getValue().stream())
.collect(Collectors.toList());
assertEquals(files.size() * numVersions,
sqlContext.sql("select distinct _hoodie_file_name from
bootstrapped").count());