vinothchandar commented on code in PR #10591:
URL: https://github.com/apache/hudi/pull/10591#discussion_r1484910496


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -736,14 +745,15 @@ private void initializeFileGroups(HoodieTableMetaClient 
dataMetaClient, Metadata
                                     int fileGroupCount) throws IOException {
     String partitionName = 
HoodieIndexUtils.getPartitionNameFromPartitionType(metadataPartition, 
dataMetaClient, dataWriteConfig.getFunctionalIndexConfig().getIndexName());
     // Remove all existing file groups or leftover files in the partition
-    final Path partitionPath = new Path(metadataWriteConfig.getBasePath(), 
partitionName);
-    FileSystem fs = metadataMetaClient.getFs();
+    final HoodieLocation partitionPath = new 
HoodieLocation(metadataWriteConfig.getBasePath(), partitionName);
+    HoodieStorage storage = metadataMetaClient.getHoodieStorage();
     try {
-      final FileStatus[] existingFiles = fs.listStatus(partitionPath);
-      if (existingFiles.length > 0) {
+      final List<HoodieFileStatus> existingFiles = 
storage.listDirectEntries(partitionPath);
+      if (existingFiles.size() > 0) {
         LOG.warn("Deleting all existing files found in MDT partition " + 
partitionName);
-        fs.delete(partitionPath, true);
-        ValidationUtils.checkState(!fs.exists(partitionPath), "Failed to 
delete MDT partition " + partitionName);
+        storage.deleteDirectory(partitionPath);

Review Comment:
   does `deleteDirectory` only work on directories? or any path? even objects.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/repair/RepairUtils.java:
##########
@@ -55,19 +55,19 @@ public final class RepairUtils {
   /**
    * Tags the instant time of each base or log file from the input file paths.
    *
-   * @param basePath          Base path of the table.
-   * @param allPaths          A {@link List} of file paths to tag.
+   * @param basePath Base path of the table.
+   * @param allPaths A {@link List} of file paths to tag.
    * @return A {@link Map} of instant time in {@link String} to a {@link List} 
of relative file paths.
    */
   public static Map<String, List<String>> tagInstantsOfBaseAndLogFiles(
-      String basePath, List<Path> allPaths) {
+      String basePath, List<HoodieLocation> allPaths) {
     // Instant time -> Set of base and log file paths
     Map<String, List<String>> instantToFilesMap = new HashMap<>();
     allPaths.forEach(path -> {
       String instantTime = FSUtils.getCommitTime(path.getName());
       instantToFilesMap.computeIfAbsent(instantTime, k -> new ArrayList<>());
       instantToFilesMap.get(instantTime).add(
-          FSUtils.getRelativePartitionPath(new Path(basePath), path));
+          FSUtils.getRelativePartitionPathFromLocation(new 
HoodieLocation(basePath), path));

Review Comment:
   rename: extractRelativePartitionPath , (drop `FromLocation`)



##########
hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java:
##########
@@ -471,22 +541,44 @@ public static FileStatus[] 
getAllDataFilesInPartition(FileSystem fs, Path partit
     }
   }
 
+  public static List<HoodieFileStatus> 
getAllDataFilesInPartition(HoodieStorage storage,
+                                                                  
HoodieLocation partitionPath)
+      throws IOException {
+    final Set<String> validFileExtensions = 
Arrays.stream(HoodieFileFormat.values())
+        
.map(HoodieFileFormat::getFileExtension).collect(Collectors.toCollection(HashSet::new));
+    final String logFileExtension = 
HoodieFileFormat.HOODIE_LOG.getFileExtension();
+
+    try {
+      return storage.listDirectEntries(partitionPath, path -> {
+        String extension = FSUtils.getFileExtension(path.getName());
+        return validFileExtensions.contains(extension) || 
path.getName().contains(logFileExtension);
+      
}).stream().filter(HoodieFileStatus::isFile).collect(Collectors.toList());
+    } catch (IOException e) {
+      // return empty FileStatus if partition does not exist already
+      if (!storage.exists(partitionPath)) {
+        return Collections.emptyList();
+      } else {
+        throw e;
+      }
+    }
+  }
+
   /**
    * Get the latest log file for the passed in file-id in the partition path
    */
-  public static Option<HoodieLogFile> getLatestLogFile(FileSystem fs, Path 
partitionPath, String fileId,
+  public static Option<HoodieLogFile> getLatestLogFile(HoodieStorage storage, 
HoodieLocation partitionPath, String fileId,
                                                        String 
logFileExtension, String deltaCommitTime) throws IOException {
-    return getLatestLogFile(getAllLogFiles(fs, partitionPath, fileId, 
logFileExtension, deltaCommitTime));
+    return getLatestLogFile(getAllLogFiles(storage, partitionPath, fileId, 
logFileExtension, deltaCommitTime));
   }
 
   /**
    * Get all the log files for the passed in file-id in the partition path.
    */
-  public static Stream<HoodieLogFile> getAllLogFiles(FileSystem fs, Path 
partitionPath, final String fileId,
+  public static Stream<HoodieLogFile> getAllLogFiles(HoodieStorage storage, 
HoodieLocation partitionPath, final String fileId,
       final String logFileExtension, final String deltaCommitTime) throws 
IOException {
     try {
-      PathFilter pathFilter = path -> path.getName().startsWith("." + fileId) 
&& path.getName().contains(logFileExtension);
-      return Arrays.stream(fs.listStatus(partitionPath, pathFilter))
+      HoodieLocationFilter pathFilter = path -> path.getName().startsWith("." 
+ fileId) && path.getName().contains(logFileExtension);

Review Comment:
   rename: StoragePathFilter



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java:
##########
@@ -273,9 +274,9 @@ public void deleteCompletedRollback(HoodieInstant instant) {
     deleteInstantFile(instant);
   }
 
-  public static void deleteInstantFile(FileSystem fs, String metaPath, 
HoodieInstant instant) {
+  public static void deleteInstantFile(HoodieStorage storage, String metaPath, 
HoodieInstant instant) {
     try {
-      fs.delete(new Path(metaPath, instant.getFileName()), false);
+      storage.deleteFile(new HoodieLocation(metaPath, instant.getFileName()));

Review Comment:
   is there an eequivalent recursive delete call? (best not to add that as an 
API on HoodieStorage, since its  acommon performance trap. will force authors 
to use FSUtils.*)



##########
hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/HFileBootstrapIndex.java:
##########
@@ -231,14 +233,14 @@ public HFileBootstrapIndexReader(HoodieTableMetaClient 
metaClient) {
     /**
      * Helper method to create native HFile Reader.
      *
-     * @param hFilePath  file path.
-     * @param fileSystem file system.
+     * @param hFilePath file path.
+     * @param storage   {@link HoodieStorage} instance.
      */
-    private static HFileReader createReader(String hFilePath, FileSystem 
fileSystem) throws IOException {
+    private static HFileReader createReader(String hFilePath, HoodieStorage 
storage) throws IOException {
       LOG.info("Opening HFile for reading :" + hFilePath);
-      Path path = new Path(hFilePath);
-      long fileSize = fileSystem.getFileStatus(path).getLen();
-      FSDataInputStream stream = fileSystem.open(path);
+      HoodieLocation path = new HoodieLocation(hFilePath);
+      long fileSize = storage.getFileStatus(path).getLength();
+      FSDataInputStream stream = (FSDataInputStream) storage.open(path);

Review Comment:
   this is still a hadoop dependency right? and its in `hudi-common`? are there 
more PRs to remove hadoop from hudi-common?



##########
hudi-common/src/main/java/org/apache/hudi/common/model/BaseFile.java:
##########
@@ -34,7 +32,7 @@ public class BaseFile implements Serializable {
 
   private static final long serialVersionUID = 1L;
 
-  private transient FileStatus fileStatus;
+  private transient HoodieFileStatus fileStatus;

Review Comment:
   we need to be sure to rename the variable as well, when we rename the 
classes. Same for HoodieLocation -> StoragePath, HoodieFileStatus -> 
StoragePathInfo



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -121,7 +122,7 @@ public abstract class AbstractHoodieLogRecordReader {
   // Read the operation metadata field from the avro record
   private final boolean withOperationField;
   // FileSystem

Review Comment:
   fix comment



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java:
##########
@@ -263,7 +267,7 @@ private FileStatus[] 
fetchFilesFromCommitMetadata(HoodieInstant instantToRollbac
    * @throws IOException
    */
   private FileStatus[] fetchFilesFromListFiles(HoodieInstant 
instantToRollback, String partitionPath, String basePath,
-                                               String baseFileExtension, 
HoodieWrapperFileSystem fs)
+                                               String baseFileExtension, 
FileSystem fs)

Review Comment:
   any reason these places are harder to eliminate `FileSystem` usage and just 
using `HoodieStorage`? have you just cut these for now, to reduce scope?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java:
##########
@@ -249,10 +252,11 @@ private ValidationOpResult 
validateCompactionOperation(HoodieTableMetaClient met
           Set<HoodieLogFile> logFilesInFileSlice = 
fs.getLogFiles().collect(Collectors.toSet());
           Set<HoodieLogFile> logFilesInCompactionOp = 
operation.getDeltaFileNames().stream().map(dp -> {
             try {
-              FileStatus[] fileStatuses = metaClient.getFs().listStatus(new 
Path(
-                  FSUtils.getPartitionPath(metaClient.getBasePath(), 
operation.getPartitionPath()), new Path(dp)));
-              ValidationUtils.checkArgument(fileStatuses.length == 1, "Expect 
only 1 file-status");
-              return new HoodieLogFile(fileStatuses[0]);
+              List<HoodieFileStatus> fileStatuses = 
metaClient.getHoodieStorage()
+                  .listDirectEntries(new HoodieLocation(

Review Comment:
   `listDirectEntries` or just `listDirectory`



##########
hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java:
##########
@@ -592,27 +703,9 @@ public static String getDFSFullPartitionPath(FileSystem 
fs, Path fullPartitionPa
     return fs.getUri() + fullPartitionPath.toUri().getRawPath();
   }
 
-  /**
-   * This is due to HUDI-140 GCS has a different behavior for detecting EOF 
during seek().
-   *
-   * @param fs fileSystem instance.
-   * @return true if the inputstream or the wrapped one is of type 
GoogleHadoopFSInputStream
-   */
-  public static boolean isGCSFileSystem(FileSystem fs) {
-    return fs.getScheme().equals(StorageSchemes.GCS.getScheme());
-  }
-
-  /**
-   * Chdfs will throw {@code IOException} instead of {@code EOFException}. It 
will cause error in isBlockCorrupted().
-   * Wrapped by {@code BoundedFsDataInputStream}, to check whether the desired 
offset is out of the file size in advance.
-   */
-  public static boolean isCHDFileSystem(FileSystem fs) {

Review Comment:
   these may move to StorageSchemes directly? or unused?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java:
##########
@@ -80,37 +84,37 @@ public DirectWriteMarkers(HoodieTable table, String 
instantTime) {
    * @param parallelism parallelism for deletion.
    */
   public boolean deleteMarkerDir(HoodieEngineContext context, int parallelism) 
{
-    return FSUtils.deleteDir(context, fs, markerDirPath, parallelism);
+    return FSUtils.deleteDir(context, storage, markerDirPath, parallelism);
   }
 
   /**
    * @return {@code true} if marker directory exists; {@code false} otherwise.
    * @throws IOException
    */
   public boolean doesMarkerDirExist() throws IOException {
-    return fs.exists(markerDirPath);
+    return storage.exists(markerDirPath);
   }
 
   @Override
   public Set<String> createdAndMergedDataPaths(HoodieEngineContext context, 
int parallelism) throws IOException {
     Set<String> dataFiles = new HashSet<>();
 
-    FileStatus[] topLevelStatuses = fs.listStatus(markerDirPath);
+    List<HoodieFileStatus> topLevelStatuses = 
storage.listDirectEntries(markerDirPath);

Review Comment:
   to be coherent : should `HoodieFileStatus` be something like 
`StorageLocationInfo` or sth, so its not misleading to be related to some 
hadoop class



##########
hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java:
##########
@@ -209,30 +229,51 @@ public static String getRelativePartitionPath(Path 
basePath, Path fullPartitionP
         : fullPartitionPathStr.substring(partitionStartIndex + 
basePath.getName().length() + 1);
   }
 
+  public static String getRelativePartitionPathFromLocation(HoodieLocation 
basePath, HoodieLocation fullPartitionPath) {

Review Comment:
   are these unit tested? assume this is modelled after an existing method



##########
hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java:
##########
@@ -85,14 +87,15 @@ public class FSUtils {
   private static final Logger LOG = LoggerFactory.getLogger(FSUtils.class);
   // Log files are of this pattern - 
.b5068208-e1a4-11e6-bf01-fe55135034f3_20170101134598.log.1_1-0-1
   // Archive log files are of this pattern - .commits_.archive.1_1-0-1
+  public static final String PATH_SEPARATOR = "/";

Review Comment:
   Leaving comment here. is it possible to write new util methods in a separate 
`StorageUtils` class instead? if there is some common methods that are used 
FileSystem free etc, move them over to `StorageUtils`? over time we can reduce 
whats in FSUtils. 
   
   if too cumbersome, can be a follow on



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java:
##########
@@ -178,17 +177,20 @@ public Map<HoodieFileGroupId, String> 
getFileGroupIdAndFullPaths(String basePath
    * @param basePath   The base path
    * @return the file full path to file status mapping
    */
-  public Map<String, FileStatus> getFullPathToFileStatus(Configuration 
hadoopConf, String basePath) {
-    Map<String, FileStatus> fullPathToFileStatus = new HashMap<>();
+  public Map<String, HoodieFileStatus> getFullPathToFileStatus(Configuration 
hadoopConf,
+                                                               String 
basePath) {
+    Map<String, HoodieFileStatus> fullPathToFileStatus = new HashMap<>();
     for (List<HoodieWriteStat> stats : getPartitionToWriteStats().values()) {
       // Iterate through all the written files.
       for (HoodieWriteStat stat : stats) {
         String relativeFilePath = stat.getPath();
-        Path fullPath = relativeFilePath != null ? 
FSUtils.getPartitionPath(basePath, relativeFilePath) : null;
+        HoodieLocation fullPath = relativeFilePath != null
+            ? FSUtils.getPartitionPath(basePath, relativeFilePath) : null;
         if (fullPath != null) {
-          long blockSize = HadoopFSUtils.getFs(fullPath.toString(), 
hadoopConf).getDefaultBlockSize(fullPath);
-          FileStatus fileStatus = new FileStatus(stat.getFileSizeInBytes(), 
false, 0, blockSize,
-              0, fullPath);
+          //long blockSize =

Review Comment:
   remove?



##########
hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java:
##########
@@ -725,6 +822,44 @@ public static <T> Map<String, T> parallelizeFilesProcess(
     return result;
   }
 
+  public static <T> Map<String, T> parallelizeSubPathProcess(

Review Comment:
   again lmk if these are new methods. seem like reimpl of old ones.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -238,9 +241,9 @@ private void scanInternalV1(Option<KeySpec> keySpecOpt) {
     HoodieLogFormatReader logFormatReaderWrapper = null;
     try {
       // Iterate over the paths
-      logFormatReaderWrapper = new HoodieLogFormatReader(fs,
+      logFormatReaderWrapper = new HoodieLogFormatReader(storage,
           logFilePaths.stream()
-              .map(filePath -> new HoodieLogFile(new CachingPath(filePath)))

Review Comment:
   is CachingPath dead now? (love it if so)



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -998,7 +1001,7 @@ private Option<Pair<Function<HoodieRecord, HoodieRecord>, 
Schema>> composeEvolve
    */
   public abstract static class Builder {
 
-    public abstract Builder withFileSystem(FileSystem fs);
+    public abstract Builder withHoodieStorage(HoodieStorage storage);

Review Comment:
   rename: withStorage



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java:
##########
@@ -475,66 +471,20 @@ public void remove() {
   }
 
   /**
-   * Fetch the right {@link FSDataInputStream} to be used by wrapping with 
required input streams.
-   * @param fs instance of {@link FileSystem} in use.
+   * Fetch the right {@link SeekableDataInputStream} to be used by wrapping 
with required input streams.
+   *
+   * @param storage    instance of {@link HoodieStorage} in use.
    * @param bufferSize buffer size to be used.
-   * @return the right {@link FSDataInputStream} as required.
+   * @return the right {@link SeekableDataInputStream} as required.
    */
-  private static FSDataInputStream getFSDataInputStream(FileSystem fs,
-                                                        HoodieLogFile logFile,
-                                                        int bufferSize) {
-    FSDataInputStream fsDataInputStream = null;
+  private static FSDataInputStream getDataInputStream(HoodieStorage storage,
+                                                      HoodieLogFile logFile,
+                                                      int bufferSize) {
     try {
-      fsDataInputStream = fs.open(logFile.getPath(), bufferSize);
+      // return storage.openLogFile(logFile.getLocation(), bufferSize);
+      return (FSDataInputStream) storage.open(logFile.getLocation());
     } catch (IOException e) {
-      throw new HoodieIOException("Exception creating input stream from file: 
" + logFile, e);
+      throw new HoodieIOException("Unable to get input stream for " + logFile, 
e);
     }
-

Review Comment:
   is this all moved under `HadoopHoodiStorage`?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java:
##########
@@ -281,36 +281,30 @@ public Writer build() throws IOException {
         rolloverLogWriteToken = rolloverLogWriteToken + suffix;
       }
 
-      Path logPath = new Path(parentPath,
+      HoodieLocation logPath = new HoodieLocation(parentPath,
           FSUtils.makeLogFileName(logFileId, fileExtension, instantTime, 
logVersion, logWriteToken));
       LOG.info("HoodieLogFile on path " + logPath);
       HoodieLogFile logFile = new HoodieLogFile(logPath, fileLen);
 
-      if (bufferSize == null) {

Review Comment:
   is this moved somewhere else?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java:
##########
@@ -475,66 +471,20 @@ public void remove() {
   }
 
   /**
-   * Fetch the right {@link FSDataInputStream} to be used by wrapping with 
required input streams.
-   * @param fs instance of {@link FileSystem} in use.
+   * Fetch the right {@link SeekableDataInputStream} to be used by wrapping 
with required input streams.
+   *
+   * @param storage    instance of {@link HoodieStorage} in use.
    * @param bufferSize buffer size to be used.
-   * @return the right {@link FSDataInputStream} as required.
+   * @return the right {@link SeekableDataInputStream} as required.
    */
-  private static FSDataInputStream getFSDataInputStream(FileSystem fs,
-                                                        HoodieLogFile logFile,
-                                                        int bufferSize) {
-    FSDataInputStream fsDataInputStream = null;
+  private static FSDataInputStream getDataInputStream(HoodieStorage storage,
+                                                      HoodieLogFile logFile,
+                                                      int bufferSize) {
     try {
-      fsDataInputStream = fs.open(logFile.getPath(), bufferSize);
+      // return storage.openLogFile(logFile.getLocation(), bufferSize);

Review Comment:
   remove?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java:
##########
@@ -46,6 +49,7 @@ public class HoodieLogFormatWriter implements 
HoodieLogFormat.Writer {
   private HoodieLogFile logFile;
   private FSDataOutputStream output;
 
+  private final HoodieStorage storage;
   private final FileSystem fs;

Review Comment:
   we need this too?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java:
##########
@@ -874,11 +880,15 @@ public void saveToPendingIndexAction(HoodieInstant 
instant, Option<byte[]> conte
   }
 
   protected void createFileInMetaPath(String filename, Option<byte[]> content, 
boolean allowOverwrite) {
-    Path fullPath = getInstantFileNamePath(filename);
+    HoodieLocation fullPath = getInstantFileNamePath(filename);
     if (allowOverwrite || 
metaClient.getTimelineLayoutVersion().isNullVersion()) {
-      FileIOUtils.createFileInPath(metaClient.getFs(), fullPath, content);
+      FileIOUtils.createFileInLocation(metaClient.getHoodieStorage(), 
fullPath, content);
     } else {
-      metaClient.getFs().createImmutableFileInPath(fullPath, content);
+      try {
+        metaClient.getHoodieStorage().createImmutableFileInPath(fullPath, 
content);
+      } catch (IOException e) {
+        throw new RuntimeException(e);

Review Comment:
   throw a HoodieRuntimeException with a message/context



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/FileStatusDTO.java:
##########
@@ -38,67 +35,25 @@ public class FileStatusDTO {
   long length;
   @JsonProperty("isdir")
   boolean isdir;
-  @JsonProperty("blockReplication")
-  short blockReplication;
-  @JsonProperty("blocksize")
-  long blocksize;
   @JsonProperty("modificationTime")
   long modificationTime;
-  @JsonProperty("accessTime")
-  long accessTime;
-  @JsonProperty("permission")
-  FSPermissionDTO permission;
-  @JsonProperty("owner")
-  String owner;
-  @JsonProperty("group")
-  String group;
-  @JsonProperty("symlink")
-  FilePathDTO symlink;
 
-  public static FileStatusDTO fromFileStatus(FileStatus fileStatus) {
+  public static FileStatusDTO fromFileStatus(HoodieFileStatus fileStatus) {
     if (null == fileStatus) {
       return null;
     }
 
     FileStatusDTO dto = new FileStatusDTO();
-    try {
-      dto.path = FilePathDTO.fromPath(fileStatus.getPath());
-      dto.length = fileStatus.getLen();
-      dto.isdir = fileStatus.isDirectory();
-      dto.blockReplication = fileStatus.getReplication();
-      dto.blocksize = fileStatus.getBlockSize();
-      dto.modificationTime = fileStatus.getModificationTime();
-      dto.accessTime = fileStatus.getAccessTime();
-      dto.symlink = fileStatus.isSymlink() ? 
FilePathDTO.fromPath(fileStatus.getSymlink()) : null;
-      safeReadAndSetMetadata(dto, fileStatus);
-    } catch (IOException ioe) {
-      throw new HoodieException(ioe);
-    }
-    return dto;
-  }
+    dto.path = FilePathDTO.fromHoodieLocation(fileStatus.getLocation());
+    dto.length = fileStatus.getLength();
+    dto.isdir = fileStatus.isDirectory();
+    dto.modificationTime = fileStatus.getModificationTime();
 
-  /**
-   * Used to safely handle FileStatus calls which might fail on some 
FileSystem implementation.

Review Comment:
   context for removing this?



##########
hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java:
##########
@@ -94,40 +96,40 @@ public static String stripMarkerFolderPrefix(String 
fullMarkerPath, String marke
   }
 
   /**
-   * @param fileSystem file system to use.
-   * @param markerDir  marker directory.
+   * @param storage   {@link HoodieStorage} to use.
+   * @param markerDir marker directory.
    * @return {@code true} if the MARKERS.type file exists; {@code false} 
otherwise.
    */
-  public static boolean doesMarkerTypeFileExist(FileSystem fileSystem, String 
markerDir) throws IOException {
-    return fileSystem.exists(new Path(markerDir, MARKER_TYPE_FILENAME));
+  public static boolean doesMarkerTypeFileExist(HoodieStorage storage, String 
markerDir) throws IOException {
+    return storage.exists(new HoodieLocation(markerDir, MARKER_TYPE_FILENAME));
   }
 
   /**
    * Reads the marker type from `MARKERS.type` file.
    *
-   * @param fileSystem file system to use.
-   * @param markerDir  marker directory.
+   * @param storage   {@link HoodieStorage} to use.
+   * @param markerDir marker directory.
    * @return the marker type, or empty if the marker type file does not exist.
    */
-  public static Option<MarkerType> readMarkerType(FileSystem fileSystem, 
String markerDir) {
-    Path markerTypeFilePath = new Path(markerDir, MARKER_TYPE_FILENAME);
-    InputStream inputStream = null;
+  public static Option<MarkerType> readMarkerType(HoodieStorage storage, 
String markerDir) {
+    HoodieLocation markerTypeFileLocation = new HoodieLocation(markerDir, 
MARKER_TYPE_FILENAME);
+    InputStream inputSteam = null;
     Option<MarkerType> content = Option.empty();
     try {
-      if (!doesMarkerTypeFileExist(fileSystem, markerDir)) {
+      if (!doesMarkerTypeFileExist(storage, markerDir)) {
         return Option.empty();
       }
-      inputStream = fileSystem.open(markerTypeFilePath);
-      String markerType = FileIOUtils.readAsUTFString(inputStream);
+      inputSteam = storage.open(markerTypeFileLocation);

Review Comment:
   inputStream



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/FileStatusDTO.java:
##########
@@ -38,67 +35,25 @@ public class FileStatusDTO {
   long length;
   @JsonProperty("isdir")
   boolean isdir;
-  @JsonProperty("blockReplication")

Review Comment:
   these are unused?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java:
##########
@@ -391,6 +392,7 @@ public Boolean isMetadataTable() {
   /**
    * Get the FS implementation for this table.
    */
+  /*

Review Comment:
   need it or remove?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java:
##########
@@ -410,28 +412,50 @@ public HoodieWrapperFileSystem getFs() {
               : new NoOpConsistencyGuard());
     }
     return fs;
+  }*/
+
+  public HoodieStorage getHoodieStorage() {
+    if (storage == null) {
+      ConsistencyGuard consistencyGuard = 
consistencyGuardConfig.isConsistencyCheckEnabled()
+          ? new FailSafeConsistencyGuard(
+          HoodieStorageUtils.getHoodieStorage(metaPath, new 
Configuration(getHadoopConf())),
+          consistencyGuardConfig)
+          : new NoOpConsistencyGuard();
+
+      storage = getHoodieStorageWithWrapperFS(
+          metaPath,
+          getHadoopConf(),
+          fileSystemRetryConfig.isFileSystemActionRetryEnable(),
+          fileSystemRetryConfig.getMaxRetryIntervalMs(),
+          fileSystemRetryConfig.getMaxRetryNumbers(),
+          fileSystemRetryConfig.getInitialRetryIntervalMs(),
+          fileSystemRetryConfig.getRetryExceptions(),
+          consistencyGuard);
+    }
+    return storage;
   }
 
+  /*
   public void setFs(HoodieWrapperFileSystem fs) {
     this.fs = fs;
+  }*/
+
+  public void setHoodieStorage(HoodieStorage storage) {
+    this.storage = storage;
   }
 
-  /**
-   * Return raw file-system.
-   *
-   * @return fs
-   */
-  public FileSystem getRawFs() {
-    return getFs().getFileSystem();
+  public HoodieStorage getRawHoodieStorage() {
+    return HoodieStorageUtils.getRawHoodieStorage(storage);
   }
 
   public Configuration getHadoopConf() {
     return hadoopConf.get();
   }
 
+  /*
   public SerializableConfiguration getSerializableHadoopConf() {
     return hadoopConf;
-  }
+  }*/

Review Comment:
   remove?



##########
hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java:
##########
@@ -80,18 +77,22 @@ public FileSystemBackedTableMetadata(HoodieEngineContext 
engineContext,
                                        String datasetBasePath) {
     super(engineContext, conf, datasetBasePath);
 
-    FileSystem fs = HadoopFSUtils.getFs(dataBasePath.get(), conf.get());
-    Path metaPath = new Path(dataBasePath.get(), 
HoodieTableMetaClient.METAFOLDER_NAME);
-    TableNotFoundException.checkTableValidity(fs, this.dataBasePath.get(), 
metaPath);
+    HoodieStorage fs = HoodieStorageUtils.getHoodieStorage(dataBasePath, 
conf.get());

Review Comment:
   fs -> storage



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to