This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 95cdfa010a9 [HUDI-7631] Clean up usage of CachingPath outside 
hudi-common module (#11059)
95cdfa010a9 is described below

commit 95cdfa010a93c6c5f6d3b85263eb7f2433f9f8a6
Author: Vova Kolmakov <[email protected]>
AuthorDate: Sun Apr 21 11:58:48 2024 +0700

    [HUDI-7631] Clean up usage of CachingPath outside hudi-common module 
(#11059)
---
 .../metadata/HoodieBackedTableMetadataWriter.java  | 57 ++++++++--------------
 .../hudi/client/utils/SparkPartitionUtils.java     |  4 +-
 .../hudi/io/storage/row/HoodieRowCreateHandle.java | 28 +++++------
 .../scala/org/apache/hudi/HoodieSparkUtils.scala   | 20 ++++----
 .../java/org/apache/hudi/common/fs/FSUtils.java    |  4 ++
 .../view/HoodieTablePreCommitFileSystemView.java   |  4 +-
 .../scala/org/apache/hudi/HoodieBaseRelation.scala | 11 ++---
 .../apache/hudi/SparkHoodieTableFileIndex.scala    |  2 +-
 .../utilities/HoodieMetadataTableValidator.java    |  5 +-
 .../utilities/streamer/SparkSampleWritesUtils.java | 15 +++---
 10 files changed, 65 insertions(+), 85 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 1fa5877c353..f20643a9e95 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -69,9 +69,6 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieIndexException;
 import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.exception.TableNotFoundException;
-import org.apache.hudi.hadoop.fs.CachingPath;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
-import org.apache.hudi.hadoop.fs.SerializablePath;
 import org.apache.hudi.index.HoodieIndexUtils;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
@@ -80,9 +77,6 @@ import org.apache.hudi.table.BulkInsertPartitioner;
 
 import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -669,15 +663,14 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
    * @return List consisting of {@code DirectoryInfo} for each partition found.
    */
   private List<DirectoryInfo> listAllPartitionsFromFilesystem(String 
initializationTime) {
-    List<SerializablePath> pathsToList = new LinkedList<>();
-    pathsToList.add(new SerializablePath(new 
CachingPath(dataWriteConfig.getBasePath())));
+    List<StoragePath> pathsToList = new LinkedList<>();
+    pathsToList.add(new StoragePath(dataWriteConfig.getBasePath()));
 
     List<DirectoryInfo> partitionsToBootstrap = new LinkedList<>();
     final int fileListingParallelism = 
metadataWriteConfig.getFileListingParallelism();
-    SerializableConfiguration conf = new 
SerializableConfiguration(dataMetaClient.getHadoopConf());
     final String dirFilterRegex = 
dataWriteConfig.getMetadataConfig().getDirectoryFilterRegex();
     final String datasetBasePath = dataMetaClient.getBasePathV2().toString();
-    SerializablePath serializableBasePath = new SerializablePath(new 
CachingPath(datasetBasePath));
+    StoragePath storageBasePath = new StoragePath(datasetBasePath);
 
     while (!pathsToList.isEmpty()) {
       // In each round we will list a section of directories
@@ -685,9 +678,8 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
       // List all directories in parallel
       engineContext.setJobStatus(this.getClass().getSimpleName(), "Listing " + 
numDirsToList + " partitions from filesystem");
       List<DirectoryInfo> processedDirectories = 
engineContext.map(pathsToList.subList(0, numDirsToList), path -> {
-        FileSystem fs = path.get().getFileSystem(conf.get());
-        String relativeDirPath = 
FSUtils.getRelativePartitionPath(serializableBasePath.get(), path.get());
-        return new DirectoryInfo(relativeDirPath, fs.listStatus(path.get()), 
initializationTime);
+        String relativeDirPath = 
FSUtils.getRelativePartitionPath(storageBasePath, path);
+        return new DirectoryInfo(relativeDirPath, 
metadataMetaClient.getStorage().listDirectEntries(path), initializationTime);
       }, numDirsToList);
 
       pathsToList = new LinkedList<>(pathsToList.subList(numDirsToList, 
pathsToList.size()));
@@ -708,9 +700,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
           partitionsToBootstrap.add(dirInfo);
         } else {
           // Add sub-dirs to the queue
-          pathsToList.addAll(dirInfo.getSubDirectories().stream()
-              .map(path -> new SerializablePath(new CachingPath(path.toUri())))
-              .collect(Collectors.toList()));
+          pathsToList.addAll(dirInfo.getSubDirectories());
         }
       }
     }
@@ -727,14 +717,9 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
   private List<DirectoryInfo> listAllPartitionsFromMDT(String 
initializationTime) throws IOException {
     List<DirectoryInfo> dirinfoList = new LinkedList<>();
     List<String> allPartitionPaths = metadata.getAllPartitionPaths().stream()
-        .map(partitionPath -> dataWriteConfig.getBasePath() + "/" + 
partitionPath).collect(Collectors.toList());
-    Map<String, FileStatus[]> partitionFileMap = 
metadata.getAllFilesInPartitions(allPartitionPaths)
-        .entrySet()
-        .stream()
-        .collect(Collectors.toMap(e -> e.getKey(),
-            e -> e.getValue().stream().map(status -> 
HadoopFSUtils.convertToHadoopFileStatus(status))
-                .toArray(FileStatus[]::new)));
-    for (Map.Entry<String, FileStatus[]> entry : partitionFileMap.entrySet()) {
+        .map(partitionPath -> dataWriteConfig.getBasePath() + 
StoragePath.SEPARATOR_CHAR + partitionPath).collect(Collectors.toList());
+    Map<String, List<StoragePathInfo>> partitionFileMap = 
metadata.getAllFilesInPartitions(allPartitionPaths);
+    for (Map.Entry<String, List<StoragePathInfo>> entry : 
partitionFileMap.entrySet()) {
       dirinfoList.add(new DirectoryInfo(entry.getKey(), entry.getValue(), 
initializationTime));
     }
     return dirinfoList;
@@ -1605,31 +1590,31 @@ public abstract class 
HoodieBackedTableMetadataWriter<I> implements HoodieTableM
     // Map of filenames within this partition to their respective sizes
     private final HashMap<String, Long> filenameToSizeMap;
     // List of directories within this partition
-    private final List<Path> subDirectories = new ArrayList<>();
+    private final List<StoragePath> subDirectories = new ArrayList<>();
     // Is this a hoodie partition
     private boolean isHoodiePartition = false;
 
-    public DirectoryInfo(String relativePath, FileStatus[] fileStatus, String 
maxInstantTime) {
+    public DirectoryInfo(String relativePath, List<StoragePathInfo> pathInfos, 
String maxInstantTime) {
       this.relativePath = relativePath;
 
       // Pre-allocate with the maximum length possible
-      filenameToSizeMap = new HashMap<>(fileStatus.length);
+      filenameToSizeMap = new HashMap<>(pathInfos.size());
 
-      for (FileStatus status : fileStatus) {
-        if (status.isDirectory()) {
+      for (StoragePathInfo pathInfo : pathInfos) {
+        if (pathInfo.isDirectory()) {
           // Ignore .hoodie directory as there cannot be any partitions inside 
it
-          if 
(!status.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) {
-            this.subDirectories.add(status.getPath());
+          if 
(!pathInfo.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) {
+            this.subDirectories.add(pathInfo.getPath());
           }
-        } else if 
(status.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX))
 {
+        } else if 
(pathInfo.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX))
 {
           // Presence of partition meta file implies this is a HUDI partition
           this.isHoodiePartition = true;
-        } else if (FSUtils.isDataFile(status.getPath())) {
+        } else if (FSUtils.isDataFile(pathInfo.getPath())) {
           // Regular HUDI data file (base file or log file)
-          String dataFileCommitTime = 
FSUtils.getCommitTime(status.getPath().getName());
+          String dataFileCommitTime = 
FSUtils.getCommitTime(pathInfo.getPath().getName());
           // Limit the file listings to files which were created before the 
maxInstant time.
           if (HoodieTimeline.compareTimestamps(dataFileCommitTime, 
LESSER_THAN_OR_EQUALS, maxInstantTime)) {
-            filenameToSizeMap.put(status.getPath().getName(), status.getLen());
+            filenameToSizeMap.put(pathInfo.getPath().getName(), 
pathInfo.getLength());
           }
         }
       }
@@ -1647,7 +1632,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
       return isHoodiePartition;
     }
 
-    List<Path> getSubDirectories() {
+    List<StoragePath> getSubDirectories() {
       return subDirectories;
     }
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkPartitionUtils.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkPartitionUtils.java
index d6545f247b6..e8db1b3515d 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkPartitionUtils.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkPartitionUtils.java
@@ -22,7 +22,7 @@ import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.HoodieSparkUtils;
 import org.apache.hudi.SparkAdapterSupport$;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.hadoop.fs.CachingPath;
+import org.apache.hudi.storage.StoragePath;
 
 import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
@@ -43,7 +43,7 @@ public class SparkPartitionUtils {
     return HoodieSparkUtils.parsePartitionColumnValues(
         partitionFields.get(),
         partitionPath,
-        new CachingPath(basePath),
+        new StoragePath(basePath),
         AvroConversionUtils.convertAvroSchemaToStructType(writerSchema),
         hadoopConf.get("timeZone", SQLConf.get().sessionLocalTimeZone()),
         sparkParsePartitionUtil,
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
index 0d164f379fe..2a8c395d0d5 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
@@ -34,14 +34,11 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieInsertException;
-import org.apache.hudi.hadoop.fs.CachingPath;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.marker.WriteMarkersFactory;
 
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.unsafe.types.UTF8String;
@@ -67,7 +64,7 @@ public class HoodieRowCreateHandle implements Serializable {
   private final HoodieWriteConfig writeConfig;
 
   private final String partitionPath;
-  private final Path path;
+  private final StoragePath path;
   private final String fileId;
 
   private final boolean populateMetaFields;
@@ -116,12 +113,11 @@ public class HoodieRowCreateHandle implements 
Serializable {
     this.currTimer = HoodieTimer.start();
 
     HoodieStorage storage = table.getMetaClient().getStorage();
-    FileSystem fs = (FileSystem) storage.getFileSystem();
 
     String writeToken = getWriteToken(taskPartitionId, taskId, taskEpochId);
     String fileName = FSUtils.makeBaseFileName(instantTime, writeToken, 
this.fileId,
         table.getBaseFileExtension());
-    this.path = makeNewPath(fs, partitionPath, fileName, writeConfig);
+    this.path = makeNewPath(storage, partitionPath, fileName, writeConfig);
 
     this.populateMetaFields = writeConfig.populateMetaFields();
     this.fileName = UTF8String.fromString(path.getName());
@@ -147,13 +143,12 @@ public class HoodieRowCreateHandle implements 
Serializable {
 
       createMarkerFile(partitionPath, fileName, instantTime, table, 
writeConfig);
 
-      this.fileWriter = 
HoodieInternalRowFileWriterFactory.getInternalRowFileWriter(
-          new StoragePath(path.toUri()), table, writeConfig, structType);
+      this.fileWriter = 
HoodieInternalRowFileWriterFactory.getInternalRowFileWriter(path, table, 
writeConfig, structType);
     } catch (IOException e) {
       throw new HoodieInsertException("Failed to initialize file writer for 
path " + path, e);
     }
 
-    LOG.info("New handle created for partition: " + partitionPath + " with 
fileId " + fileId);
+    LOG.info("New handle created for partition: {} with fileId {}", 
partitionPath, fileId);
   }
 
   /**
@@ -242,9 +237,8 @@ public class HoodieRowCreateHandle implements Serializable {
     stat.setNumInserts(writeStatus.getTotalRecords());
     stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);
     stat.setFileId(fileId);
-    stat.setPath(new StoragePath(writeConfig.getBasePath()), new 
StoragePath(path.toUri()));
-    long fileSizeInBytes = 
FSUtils.getFileSize(table.getMetaClient().getStorage(),
-        new StoragePath(path.toUri()));
+    stat.setPath(new StoragePath(writeConfig.getBasePath()), path);
+    long fileSizeInBytes = 
FSUtils.getFileSize(table.getMetaClient().getStorage(), path);
     stat.setTotalWriteBytes(fileSizeInBytes);
     stat.setFileSizeInBytes(fileSizeInBytes);
     stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords());
@@ -261,16 +255,16 @@ public class HoodieRowCreateHandle implements 
Serializable {
     return path.getName();
   }
 
-  private static Path makeNewPath(FileSystem fs, String partitionPath, String 
fileName, HoodieWriteConfig writeConfig) {
-    Path path = 
FSUtils.constructAbsolutePathInHadoopPath(writeConfig.getBasePath(), 
partitionPath);
+  private static StoragePath makeNewPath(HoodieStorage storage, String 
partitionPath, String fileName, HoodieWriteConfig writeConfig) {
+    StoragePath path = new StoragePath(writeConfig.getBasePath(), 
partitionPath);
     try {
-      if (!fs.exists(path)) {
-        fs.mkdirs(path); // create a new partition as needed.
+      if (!storage.exists(path)) {
+        storage.createDirectory(path); // create a new partition as needed.
       }
     } catch (IOException e) {
       throw new HoodieIOException("Failed to make dir " + path, e);
     }
-    return new CachingPath(path.toString(), fileName);
+    return new StoragePath(path, fileName);
   }
 
   /**
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
index 3393da6bd83..7febf2a2ced 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
@@ -25,7 +25,7 @@ import org.apache.hudi.HoodieConversionUtils.toScalaOption
 import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils}
 import org.apache.hudi.client.utils.SparkRowSerDe
 import org.apache.hudi.common.model.HoodieRecord
-import org.apache.hudi.hadoop.fs.CachingPath
+import org.apache.hudi.storage.StoragePath
 import org.apache.hudi.util.ExceptionWrappingIterator
 import org.apache.spark.SPARK_VERSION
 import org.apache.spark.internal.Logging
@@ -237,7 +237,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with 
SparkVersionsSupport wi
 
   def parsePartitionColumnValues(partitionColumns: Array[String],
                                  partitionPath: String,
-                                 basePath: Path,
+                                 basePath: StoragePath,
                                  schema: StructType,
                                  timeZoneId: String,
                                  sparkParsePartitionUtil: 
SparkParsePartitionUtil,
@@ -246,7 +246,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with 
SparkVersionsSupport wi
       // This is a non-partitioned table
       Array.empty
     } else {
-      val partitionFragments = partitionPath.split("/")
+      val partitionFragments = partitionPath.split(StoragePath.SEPARATOR)
       if (partitionFragments.length != partitionColumns.length) {
         if (partitionColumns.length == 1) {
           // If the partition column size is not equal to the partition 
fragment size
@@ -290,9 +290,9 @@ object HoodieSparkUtils extends SparkAdapterSupport with 
SparkVersionsSupport wi
             } else {
               partition
             }
-        }.mkString("/")
+        }.mkString(StoragePath.SEPARATOR)
 
-        val pathWithPartitionName = new CachingPath(basePath, 
CachingPath.createRelativePathUnsafe(partitionWithName))
+        val pathWithPartitionName = new StoragePath(basePath, 
partitionWithName)
         val partitionSchema = StructType(schema.fields.filter(f => 
partitionColumns.contains(f.name)))
         val partitionValues = parsePartitionPath(pathWithPartitionName, 
partitionSchema, timeZoneId,
           sparkParsePartitionUtil, basePath, shouldValidatePartitionCols)
@@ -301,14 +301,14 @@ object HoodieSparkUtils extends SparkAdapterSupport with 
SparkVersionsSupport wi
     }
   }
 
-  private def parsePartitionPath(partitionPath: Path, partitionSchema: 
StructType, timeZoneId: String,
-                                 sparkParsePartitionUtil: 
SparkParsePartitionUtil, basePath: Path,
+  private def parsePartitionPath(partitionPath: StoragePath, partitionSchema: 
StructType, timeZoneId: String,
+                                 sparkParsePartitionUtil: 
SparkParsePartitionUtil, basePath: StoragePath,
                                  shouldValidatePartitionCols: Boolean): 
Seq[Any] = {
     val partitionDataTypes = partitionSchema.map(f => f.name -> 
f.dataType).toMap
     sparkParsePartitionUtil.parsePartition(
-      partitionPath,
+      new Path(partitionPath.toUri),
       typeInference = false,
-      Set(basePath),
+      Set(new Path(basePath.toUri)),
       partitionDataTypes,
       getTimeZone(timeZoneId),
       validatePartitionValues = shouldValidatePartitionCols
@@ -329,7 +329,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with 
SparkVersionsSupport wi
         partitionVals(index) = fragment.substring(fragment.indexOf("=") + 1)
 
       } else {
-        partitionVals(index) += "/" + fragment
+        partitionVals(index) += StoragePath.SEPARATOR + fragment
       }
     }
     return partitionVals
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index 4eeee2d8ba5..afd28f0ebac 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -510,6 +510,10 @@ public class FSUtils {
     return isBaseFile(path) || isLogFile(path);
   }
 
+  public static boolean isDataFile(StoragePath path) {
+    return isBaseFile(path) || isLogFile(path);
+  }
+
   /**
    * Get the names of all the base and log files in the given partition path.
    */
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java
index afae30ca8e2..ea6b8f429bd 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java
@@ -21,7 +21,7 @@ package org.apache.hudi.common.table.view;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.hadoop.fs.CachingPath;
+import org.apache.hudi.storage.StoragePath;
 
 import java.util.Collections;
 import java.util.List;
@@ -71,7 +71,7 @@ public class HoodieTablePreCommitFileSystemView {
     Map<String, HoodieBaseFile> newFilesWrittenForPartition = 
filesWritten.stream()
         .filter(file -> partitionStr.equals(file.getPartitionPath()))
         .collect(Collectors.toMap(HoodieWriteStat::getFileId, writeStat -> 
-            new HoodieBaseFile(new CachingPath(tableMetaClient.getBasePath(), 
writeStat.getPath()).toString(), writeStat.getFileId(), preCommitInstantTime, 
null)));
+            new HoodieBaseFile(new StoragePath(tableMetaClient.getBasePath(), 
writeStat.getPath()).toString(), writeStat.getFileId(), preCommitInstantTime, 
null)));
 
     Stream<HoodieBaseFile> committedBaseFiles = 
this.completedCommitsFileSystemView.getLatestBaseFiles(partitionStr);
     Map<String, HoodieBaseFile> allFileIds = committedBaseFiles
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index f8784192e41..b898a71c38b 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -39,7 +39,7 @@ import org.apache.hudi.common.util.{ConfigUtils, StringUtils}
 import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.HoodieException
-import org.apache.hudi.hadoop.fs.{CachingPath, HadoopFSUtils}
+import org.apache.hudi.hadoop.fs.HadoopFSUtils
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
 import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
@@ -68,7 +68,6 @@ import org.apache.spark.sql.sources.{BaseRelation, Filter, 
PrunedFilteredScan}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.{Row, SQLContext, SparkSession}
 
-import java.net.URI
 import scala.collection.JavaConverters._
 import scala.util.{Failure, Success, Try}
 
@@ -492,14 +491,14 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
   protected def getPartitionColumnsAsInternalRowInternal(file: 
StoragePathInfo, basePath: Path,
                                                          
extractPartitionValuesFromPartitionPath: Boolean): InternalRow = {
     if (extractPartitionValuesFromPartitionPath) {
-      val tablePathWithoutScheme = 
CachingPath.getPathWithoutSchemeAndAuthority(basePath)
-      val partitionPathWithoutScheme = 
CachingPath.getPathWithoutSchemeAndAuthority(new 
Path(file.getPath.getParent.toUri))
-      val relativePath = new 
URI(tablePathWithoutScheme.toString).relativize(new 
URI(partitionPathWithoutScheme.toString)).toString
+      val tablePathWithoutScheme = new 
StoragePath(basePath.toUri).getPathWithoutSchemeAndAuthority
+      val partitionPathWithoutScheme = new 
StoragePath(file.getPath.getParent.toUri).getPathWithoutSchemeAndAuthority
+      val relativePath = 
tablePathWithoutScheme.toUri.relativize(partitionPathWithoutScheme.toUri).toString
       val timeZoneId = conf.get("timeZone", 
sparkSession.sessionState.conf.sessionLocalTimeZone)
       val rowValues = HoodieSparkUtils.parsePartitionColumnValues(
         partitionColumns,
         relativePath,
-        basePath,
+        new StoragePath(basePath.toUri),
         tableStructSchema,
         timeZoneId,
         sparkAdapter.getSparkParsePartitionUtil,
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
index 02b72de0cb9..c0d5864c73d 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
@@ -401,7 +401,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
   }
 
   protected def doParsePartitionColumnValues(partitionColumns: Array[String], 
partitionPath: String): Array[Object] = {
-    HoodieSparkUtils.parsePartitionColumnValues(partitionColumns, 
partitionPath, new Path(getBasePath.toUri), schema,
+    HoodieSparkUtils.parsePartitionColumnValues(partitionColumns, 
partitionPath, getBasePath, schema,
       configProperties.getString(DateTimeUtils.TIMEZONE_OPTION, 
SQLConf.get.sessionLocalTimeZone),
       sparkParsePartitionUtil, shouldValidatePartitionColumns(spark))
   }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index b5b2d7c0485..75ff9a41fc0 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -103,7 +103,6 @@ import java.util.stream.Collectors;
 import scala.Tuple2;
 
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
-import static 
org.apache.hudi.hadoop.fs.CachingPath.getPathWithoutSchemeAndAuthority;
 import static 
org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath;
 
 /**
@@ -1244,8 +1243,8 @@ public class HoodieMetadataTableValidator implements 
Serializable {
   }
 
   private String getRelativePath(String basePath, String absoluteFilePath) {
-    String basePathStr = getPathWithoutSchemeAndAuthority(new 
Path(basePath)).toString();
-    String absoluteFilePathStr = getPathWithoutSchemeAndAuthority(new 
Path(absoluteFilePath)).toString();
+    String basePathStr = new 
StoragePath(basePath).getPathWithoutSchemeAndAuthority().toString();
+    String absoluteFilePathStr = new 
StoragePath(absoluteFilePath).getPathWithoutSchemeAndAuthority().toString();
 
     if (!absoluteFilePathStr.startsWith(basePathStr)) {
       throw new IllegalArgumentException("File path does not belong to the 
base path! basePath="
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SparkSampleWritesUtils.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SparkSampleWritesUtils.java
index e7dca04bbe7..01c2ab7ef11 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SparkSampleWritesUtils.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SparkSampleWritesUtils.java
@@ -32,12 +32,11 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.hadoop.fs.CachingPath;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.slf4j.Logger;
@@ -80,7 +79,7 @@ public class SparkSampleWritesUtils {
       Pair<Boolean, String> result = doSampleWrites(jsc, recordsOpt, 
writeConfig, instantTime);
       if (result.getLeft()) {
         long avgSize = getAvgSizeFromSampleWrites(jsc, result.getRight());
-        LOG.info("Overwriting record size estimate to " + avgSize);
+        LOG.info("Overwriting record size estimate to {}", avgSize);
         TypedProperties props = writeConfig.getProps();
         props.put(COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(), 
String.valueOf(avgSize));
         return 
Option.of(HoodieWriteConfig.newBuilder().withProperties(props).build());
@@ -121,7 +120,7 @@ public class SparkSampleWritesUtils {
         sampleWriteClient.startCommitWithTime(instantTime);
         JavaRDD<WriteStatus> writeStatusRDD = 
sampleWriteClient.bulkInsert(jsc.parallelize(samples, 1), instantTime);
         if (writeStatusRDD.filter(WriteStatus::hasErrors).count() > 0) {
-          LOG.error(String.format("sample writes for table %s failed with 
errors.", writeConfig.getTableName()));
+          LOG.error("sample writes for table {} failed with errors.", 
writeConfig.getTableName());
           if (LOG.isTraceEnabled()) {
             LOG.trace("Printing out the top 100 errors");
             writeStatusRDD.filter(WriteStatus::hasErrors).take(100).forEach(ws 
-> {
@@ -139,10 +138,10 @@ public class SparkSampleWritesUtils {
   }
 
   private static String getSampleWritesBasePath(JavaSparkContext jsc, 
HoodieWriteConfig writeConfig, String instantTime) throws IOException {
-    Path basePath = new CachingPath(writeConfig.getBasePath(), 
SAMPLE_WRITES_FOLDER_PATH + StoragePath.SEPARATOR + instantTime);
-    FileSystem fs = HadoopFSUtils.getFs(basePath, jsc.hadoopConfiguration());
-    if (fs.exists(basePath)) {
-      fs.delete(basePath, true);
+    StoragePath basePath = new StoragePath(writeConfig.getBasePath(), 
SAMPLE_WRITES_FOLDER_PATH + StoragePath.SEPARATOR + instantTime);
+    HoodieStorage storage = getMetaClient(jsc, 
writeConfig.getBasePath()).getStorage();
+    if (storage.exists(basePath)) {
+      storage.deleteDirectory(basePath);
     }
     return basePath.toString();
   }

Reply via email to