This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 4180f7827fb90558dd5808c3c82d3ad1c6180281 Author: Vova Kolmakov <[email protected]> AuthorDate: Sun Apr 21 11:58:48 2024 +0700 [HUDI-7631] Clean up usage of CachingPath outside hudi-common module (#11059) --- .../metadata/HoodieBackedTableMetadataWriter.java | 57 ++++++++-------------- .../hudi/client/utils/SparkPartitionUtils.java | 4 +- .../hudi/io/storage/row/HoodieRowCreateHandle.java | 28 +++++------ .../scala/org/apache/hudi/HoodieSparkUtils.scala | 20 ++++---- .../java/org/apache/hudi/common/fs/FSUtils.java | 4 ++ .../view/HoodieTablePreCommitFileSystemView.java | 4 +- .../scala/org/apache/hudi/HoodieBaseRelation.scala | 12 ++--- .../apache/hudi/SparkHoodieTableFileIndex.scala | 2 +- .../utilities/HoodieMetadataTableValidator.java | 5 +- .../utilities/streamer/SparkSampleWritesUtils.java | 15 +++--- 10 files changed, 65 insertions(+), 86 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 0714f27d0e8..8970640c6ee 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -67,18 +67,12 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIndexException; import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.exception.TableNotFoundException; -import org.apache.hudi.hadoop.fs.CachingPath; -import org.apache.hudi.hadoop.fs.HadoopFSUtils; -import org.apache.hudi.hadoop.fs.SerializablePath; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.StoragePathInfo; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -593,15 +587,14 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM * @return List consisting of {@code DirectoryInfo} for each partition found. */ private List<DirectoryInfo> listAllPartitionsFromFilesystem(String initializationTime) { - List<SerializablePath> pathsToList = new LinkedList<>(); - pathsToList.add(new SerializablePath(new CachingPath(dataWriteConfig.getBasePath()))); + List<StoragePath> pathsToList = new LinkedList<>(); + pathsToList.add(new StoragePath(dataWriteConfig.getBasePath())); List<DirectoryInfo> partitionsToBootstrap = new LinkedList<>(); final int fileListingParallelism = metadataWriteConfig.getFileListingParallelism(); - SerializableConfiguration conf = new SerializableConfiguration(dataMetaClient.getHadoopConf()); final String dirFilterRegex = dataWriteConfig.getMetadataConfig().getDirectoryFilterRegex(); final String datasetBasePath = dataMetaClient.getBasePathV2().toString(); - SerializablePath serializableBasePath = new SerializablePath(new CachingPath(datasetBasePath)); + StoragePath storageBasePath = new StoragePath(datasetBasePath); while (!pathsToList.isEmpty()) { // In each round we will list a section of directories @@ -609,9 +602,8 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM // List all directories in parallel engineContext.setJobStatus(this.getClass().getSimpleName(), "Listing " + numDirsToList + " partitions from filesystem"); List<DirectoryInfo> processedDirectories = engineContext.map(pathsToList.subList(0, numDirsToList), path -> { - FileSystem fs = path.get().getFileSystem(conf.get()); - String relativeDirPath = FSUtils.getRelativePartitionPath(serializableBasePath.get(), path.get()); - return new DirectoryInfo(relativeDirPath, fs.listStatus(path.get()), initializationTime); + String relativeDirPath = FSUtils.getRelativePartitionPath(storageBasePath, path); + return new DirectoryInfo(relativeDirPath, metadataMetaClient.getStorage().listDirectEntries(path), initializationTime); }, numDirsToList); pathsToList = new LinkedList<>(pathsToList.subList(numDirsToList, pathsToList.size())); @@ -632,9 +624,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM partitionsToBootstrap.add(dirInfo); } else { // Add sub-dirs to the queue - pathsToList.addAll(dirInfo.getSubDirectories().stream() - .map(path -> new SerializablePath(new CachingPath(path.toUri()))) - .collect(Collectors.toList())); + pathsToList.addAll(dirInfo.getSubDirectories()); } } } @@ -651,14 +641,9 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM private List<DirectoryInfo> listAllPartitionsFromMDT(String initializationTime) throws IOException { List<DirectoryInfo> dirinfoList = new LinkedList<>(); List<String> allPartitionPaths = metadata.getAllPartitionPaths().stream() - .map(partitionPath -> dataWriteConfig.getBasePath() + "/" + partitionPath).collect(Collectors.toList()); - Map<String, FileStatus[]> partitionFileMap = metadata.getAllFilesInPartitions(allPartitionPaths) - .entrySet() - .stream() - .collect(Collectors.toMap(e -> e.getKey(), - e -> e.getValue().stream().map(status -> HadoopFSUtils.convertToHadoopFileStatus(status)) - .toArray(FileStatus[]::new))); - for (Map.Entry<String, FileStatus[]> entry : partitionFileMap.entrySet()) { + .map(partitionPath -> dataWriteConfig.getBasePath() + StoragePath.SEPARATOR_CHAR + partitionPath).collect(Collectors.toList()); + Map<String, List<StoragePathInfo>> partitionFileMap = metadata.getAllFilesInPartitions(allPartitionPaths); + for (Map.Entry<String, List<StoragePathInfo>> entry : partitionFileMap.entrySet()) { dirinfoList.add(new DirectoryInfo(entry.getKey(), entry.getValue(), initializationTime)); } return dirinfoList; @@ -1495,31 +1480,31 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM // Map of filenames within this partition to their respective sizes private final HashMap<String, Long> filenameToSizeMap; // List of directories within this partition - private final List<Path> subDirectories = new ArrayList<>(); + private final List<StoragePath> subDirectories = new ArrayList<>(); // Is this a hoodie partition private boolean isHoodiePartition = false; - public DirectoryInfo(String relativePath, FileStatus[] fileStatus, String maxInstantTime) { + public DirectoryInfo(String relativePath, List<StoragePathInfo> pathInfos, String maxInstantTime) { this.relativePath = relativePath; // Pre-allocate with the maximum length possible - filenameToSizeMap = new HashMap<>(fileStatus.length); + filenameToSizeMap = new HashMap<>(pathInfos.size()); - for (FileStatus status : fileStatus) { - if (status.isDirectory()) { + for (StoragePathInfo pathInfo : pathInfos) { + if (pathInfo.isDirectory()) { // Ignore .hoodie directory as there cannot be any partitions inside it - if (!status.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) { - this.subDirectories.add(status.getPath()); + if (!pathInfo.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) { + this.subDirectories.add(pathInfo.getPath()); } - } else if (status.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)) { + } else if (pathInfo.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)) { // Presence of partition meta file implies this is a HUDI partition this.isHoodiePartition = true; - } else if (FSUtils.isDataFile(status.getPath())) { + } else if (FSUtils.isDataFile(pathInfo.getPath())) { // Regular HUDI data file (base file or log file) - String dataFileCommitTime = FSUtils.getCommitTime(status.getPath().getName()); + String dataFileCommitTime = FSUtils.getCommitTime(pathInfo.getPath().getName()); // Limit the file listings to files which were created before the maxInstant time. if (HoodieTimeline.compareTimestamps(dataFileCommitTime, LESSER_THAN_OR_EQUALS, maxInstantTime)) { - filenameToSizeMap.put(status.getPath().getName(), status.getLen()); + filenameToSizeMap.put(pathInfo.getPath().getName(), pathInfo.getLength()); } } } @@ -1537,7 +1522,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM return isHoodiePartition; } - List<Path> getSubDirectories() { + List<StoragePath> getSubDirectories() { return subDirectories; } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkPartitionUtils.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkPartitionUtils.java index d6545f247b6..e8db1b3515d 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkPartitionUtils.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkPartitionUtils.java @@ -22,7 +22,7 @@ import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.HoodieSparkUtils; import org.apache.hudi.SparkAdapterSupport$; import org.apache.hudi.common.util.Option; -import org.apache.hudi.hadoop.fs.CachingPath; +import org.apache.hudi.storage.StoragePath; import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; @@ -43,7 +43,7 @@ public class SparkPartitionUtils { return HoodieSparkUtils.parsePartitionColumnValues( partitionFields.get(), partitionPath, - new CachingPath(basePath), + new StoragePath(basePath), AvroConversionUtils.convertAvroSchemaToStructType(writerSchema), hadoopConf.get("timeZone", SQLConf.get().sessionLocalTimeZone()), sparkParsePartitionUtil, diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java index 0d164f379fe..2a8c395d0d5 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java @@ -34,14 +34,11 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieInsertException; -import org.apache.hudi.hadoop.fs.CachingPath; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.marker.WriteMarkersFactory; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.types.StructType; import org.apache.spark.unsafe.types.UTF8String; @@ -67,7 +64,7 @@ public class HoodieRowCreateHandle implements Serializable { private final HoodieWriteConfig writeConfig; private final String partitionPath; - private final Path path; + private final StoragePath path; private final String fileId; private final boolean populateMetaFields; @@ -116,12 +113,11 @@ public class HoodieRowCreateHandle implements Serializable { this.currTimer = HoodieTimer.start(); HoodieStorage storage = table.getMetaClient().getStorage(); - FileSystem fs = (FileSystem) storage.getFileSystem(); String writeToken = getWriteToken(taskPartitionId, taskId, taskEpochId); String fileName = FSUtils.makeBaseFileName(instantTime, writeToken, this.fileId, table.getBaseFileExtension()); - this.path = makeNewPath(fs, partitionPath, fileName, writeConfig); + this.path = makeNewPath(storage, partitionPath, fileName, writeConfig); this.populateMetaFields = writeConfig.populateMetaFields(); this.fileName = UTF8String.fromString(path.getName()); @@ -147,13 +143,12 @@ public class HoodieRowCreateHandle implements Serializable { createMarkerFile(partitionPath, fileName, instantTime, table, writeConfig); - this.fileWriter = HoodieInternalRowFileWriterFactory.getInternalRowFileWriter( - new StoragePath(path.toUri()), table, writeConfig, structType); + this.fileWriter = HoodieInternalRowFileWriterFactory.getInternalRowFileWriter(path, table, writeConfig, structType); } catch (IOException e) { throw new HoodieInsertException("Failed to initialize file writer for path " + path, e); } - LOG.info("New handle created for partition: " + partitionPath + " with fileId " + fileId); + LOG.info("New handle created for partition: {} with fileId {}", partitionPath, fileId); } /** @@ -242,9 +237,8 @@ public class HoodieRowCreateHandle implements Serializable { stat.setNumInserts(writeStatus.getTotalRecords()); stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT); stat.setFileId(fileId); - stat.setPath(new StoragePath(writeConfig.getBasePath()), new StoragePath(path.toUri())); - long fileSizeInBytes = FSUtils.getFileSize(table.getMetaClient().getStorage(), - new StoragePath(path.toUri())); + stat.setPath(new StoragePath(writeConfig.getBasePath()), path); + long fileSizeInBytes = FSUtils.getFileSize(table.getMetaClient().getStorage(), path); stat.setTotalWriteBytes(fileSizeInBytes); stat.setFileSizeInBytes(fileSizeInBytes); stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords()); @@ -261,16 +255,16 @@ public class HoodieRowCreateHandle implements Serializable { return path.getName(); } - private static Path makeNewPath(FileSystem fs, String partitionPath, String fileName, HoodieWriteConfig writeConfig) { - Path path = FSUtils.constructAbsolutePathInHadoopPath(writeConfig.getBasePath(), partitionPath); + private static StoragePath makeNewPath(HoodieStorage storage, String partitionPath, String fileName, HoodieWriteConfig writeConfig) { + StoragePath path = new StoragePath(writeConfig.getBasePath(), partitionPath); try { - if (!fs.exists(path)) { - fs.mkdirs(path); // create a new partition as needed. + if (!storage.exists(path)) { + storage.createDirectory(path); // create a new partition as needed. } } catch (IOException e) { throw new HoodieIOException("Failed to make dir " + path, e); } - return new CachingPath(path.toString(), fileName); + return new StoragePath(path, fileName); } /** diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index 3393da6bd83..7febf2a2ced 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -25,7 +25,7 @@ import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils} import org.apache.hudi.client.utils.SparkRowSerDe import org.apache.hudi.common.model.HoodieRecord -import org.apache.hudi.hadoop.fs.CachingPath +import org.apache.hudi.storage.StoragePath import org.apache.hudi.util.ExceptionWrappingIterator import org.apache.spark.SPARK_VERSION import org.apache.spark.internal.Logging @@ -237,7 +237,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport wi def parsePartitionColumnValues(partitionColumns: Array[String], partitionPath: String, - basePath: Path, + basePath: StoragePath, schema: StructType, timeZoneId: String, sparkParsePartitionUtil: SparkParsePartitionUtil, @@ -246,7 +246,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport wi // This is a non-partitioned table Array.empty } else { - val partitionFragments = partitionPath.split("/") + val partitionFragments = partitionPath.split(StoragePath.SEPARATOR) if (partitionFragments.length != partitionColumns.length) { if (partitionColumns.length == 1) { // If the partition column size is not equal to the partition fragment size @@ -290,9 +290,9 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport wi } else { partition } - }.mkString("/") + }.mkString(StoragePath.SEPARATOR) - val pathWithPartitionName = new CachingPath(basePath, CachingPath.createRelativePathUnsafe(partitionWithName)) + val pathWithPartitionName = new StoragePath(basePath, partitionWithName) val partitionSchema = StructType(schema.fields.filter(f => partitionColumns.contains(f.name))) val partitionValues = parsePartitionPath(pathWithPartitionName, partitionSchema, timeZoneId, sparkParsePartitionUtil, basePath, shouldValidatePartitionCols) @@ -301,14 +301,14 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport wi } } - private def parsePartitionPath(partitionPath: Path, partitionSchema: StructType, timeZoneId: String, - sparkParsePartitionUtil: SparkParsePartitionUtil, basePath: Path, + private def parsePartitionPath(partitionPath: StoragePath, partitionSchema: StructType, timeZoneId: String, + sparkParsePartitionUtil: SparkParsePartitionUtil, basePath: StoragePath, shouldValidatePartitionCols: Boolean): Seq[Any] = { val partitionDataTypes = partitionSchema.map(f => f.name -> f.dataType).toMap sparkParsePartitionUtil.parsePartition( - partitionPath, + new Path(partitionPath.toUri), typeInference = false, - Set(basePath), + Set(new Path(basePath.toUri)), partitionDataTypes, getTimeZone(timeZoneId), validatePartitionValues = shouldValidatePartitionCols @@ -329,7 +329,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport wi partitionVals(index) = fragment.substring(fragment.indexOf("=") + 1) } else { - partitionVals(index) += "/" + fragment + partitionVals(index) += StoragePath.SEPARATOR + fragment } } return partitionVals diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index 0685d8d4a88..0b6d8699631 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -567,6 +567,10 @@ public class FSUtils { return isBaseFile(path) || isLogFile(path); } + public static boolean isDataFile(StoragePath path) { + return isBaseFile(path) || isLogFile(path); + } + /** * Get the names of all the base and log files in the given partition path. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java index afae30ca8e2..ea6b8f429bd 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java @@ -21,7 +21,7 @@ package org.apache.hudi.common.table.view; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.hadoop.fs.CachingPath; +import org.apache.hudi.storage.StoragePath; import java.util.Collections; import java.util.List; @@ -71,7 +71,7 @@ public class HoodieTablePreCommitFileSystemView { Map<String, HoodieBaseFile> newFilesWrittenForPartition = filesWritten.stream() .filter(file -> partitionStr.equals(file.getPartitionPath())) .collect(Collectors.toMap(HoodieWriteStat::getFileId, writeStat -> - new HoodieBaseFile(new CachingPath(tableMetaClient.getBasePath(), writeStat.getPath()).toString(), writeStat.getFileId(), preCommitInstantTime, null))); + new HoodieBaseFile(new StoragePath(tableMetaClient.getBasePath(), writeStat.getPath()).toString(), writeStat.getFileId(), preCommitInstantTime, null))); Stream<HoodieBaseFile> committedBaseFiles = this.completedCommitsFileSystemView.getLatestBaseFiles(partitionStr); Map<String, HoodieBaseFile> allFileIds = committedBaseFiles diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index d4ba0f714a9..c228d3db0ed 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -39,7 +39,7 @@ import org.apache.hudi.common.util.{ConfigUtils, StringUtils} import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.HoodieException -import org.apache.hudi.hadoop.fs.{CachingPath, HadoopFSUtils} +import org.apache.hudi.hadoop.fs.HadoopFSUtils import org.apache.hudi.internal.schema.InternalSchema import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper} @@ -68,8 +68,6 @@ import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{Row, SQLContext, SparkSession} -import java.net.URI - import scala.collection.JavaConverters._ import scala.util.{Failure, Success, Try} @@ -489,14 +487,14 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, protected def getPartitionColumnsAsInternalRowInternal(file: StoragePathInfo, basePath: Path, extractPartitionValuesFromPartitionPath: Boolean): InternalRow = { if (extractPartitionValuesFromPartitionPath) { - val tablePathWithoutScheme = CachingPath.getPathWithoutSchemeAndAuthority(basePath) - val partitionPathWithoutScheme = CachingPath.getPathWithoutSchemeAndAuthority(new Path(file.getPath.getParent.toUri)) - val relativePath = new URI(tablePathWithoutScheme.toString).relativize(new URI(partitionPathWithoutScheme.toString)).toString + val tablePathWithoutScheme = new StoragePath(basePath.toUri).getPathWithoutSchemeAndAuthority + val partitionPathWithoutScheme = new StoragePath(file.getPath.getParent.toUri).getPathWithoutSchemeAndAuthority + val relativePath = tablePathWithoutScheme.toUri.relativize(partitionPathWithoutScheme.toUri).toString val timeZoneId = conf.get("timeZone", sparkSession.sessionState.conf.sessionLocalTimeZone) val rowValues = HoodieSparkUtils.parsePartitionColumnValues( partitionColumns, relativePath, - basePath, + new StoragePath(basePath.toUri), tableStructSchema, timeZoneId, sparkAdapter.getSparkParsePartitionUtil, diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala index 5dabebefd7f..9655f2ae4e0 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala @@ -401,7 +401,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession, } protected def doParsePartitionColumnValues(partitionColumns: Array[String], partitionPath: String): Array[Object] = { - HoodieSparkUtils.parsePartitionColumnValues(partitionColumns, partitionPath, new Path(getBasePath.toUri), schema, + HoodieSparkUtils.parsePartitionColumnValues(partitionColumns, partitionPath, getBasePath, schema, configProperties.getString(DateTimeUtils.TIMEZONE_OPTION, SQLConf.get.sessionLocalTimeZone), sparkParsePartitionUtil, shouldValidatePartitionColumns(spark)) } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java index 8a2ded37fd5..6265f0ba3db 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java @@ -103,7 +103,6 @@ import java.util.stream.Collectors; import scala.Tuple2; import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes; -import static org.apache.hudi.hadoop.fs.CachingPath.getPathWithoutSchemeAndAuthority; import static org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath; /** @@ -1244,8 +1243,8 @@ public class HoodieMetadataTableValidator implements Serializable { } private String getRelativePath(String basePath, String absoluteFilePath) { - String basePathStr = getPathWithoutSchemeAndAuthority(new Path(basePath)).toString(); - String absoluteFilePathStr = getPathWithoutSchemeAndAuthority(new Path(absoluteFilePath)).toString(); + String basePathStr = new StoragePath(basePath).getPathWithoutSchemeAndAuthority().toString(); + String absoluteFilePathStr = new StoragePath(absoluteFilePath).getPathWithoutSchemeAndAuthority().toString(); if (!absoluteFilePathStr.startsWith(basePathStr)) { throw new IllegalArgumentException("File path does not belong to the base path! basePath=" diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SparkSampleWritesUtils.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SparkSampleWritesUtils.java index e7dca04bbe7..01c2ab7ef11 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SparkSampleWritesUtils.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SparkSampleWritesUtils.java @@ -32,12 +32,11 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.hadoop.fs.CachingPath; import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePath; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.slf4j.Logger; @@ -80,7 +79,7 @@ public class SparkSampleWritesUtils { Pair<Boolean, String> result = doSampleWrites(jsc, recordsOpt, writeConfig, instantTime); if (result.getLeft()) { long avgSize = getAvgSizeFromSampleWrites(jsc, result.getRight()); - LOG.info("Overwriting record size estimate to " + avgSize); + LOG.info("Overwriting record size estimate to {}", avgSize); TypedProperties props = writeConfig.getProps(); props.put(COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(), String.valueOf(avgSize)); return Option.of(HoodieWriteConfig.newBuilder().withProperties(props).build()); @@ -121,7 +120,7 @@ public class SparkSampleWritesUtils { sampleWriteClient.startCommitWithTime(instantTime); JavaRDD<WriteStatus> writeStatusRDD = sampleWriteClient.bulkInsert(jsc.parallelize(samples, 1), instantTime); if (writeStatusRDD.filter(WriteStatus::hasErrors).count() > 0) { - LOG.error(String.format("sample writes for table %s failed with errors.", writeConfig.getTableName())); + LOG.error("sample writes for table {} failed with errors.", writeConfig.getTableName()); if (LOG.isTraceEnabled()) { LOG.trace("Printing out the top 100 errors"); writeStatusRDD.filter(WriteStatus::hasErrors).take(100).forEach(ws -> { @@ -139,10 +138,10 @@ public class SparkSampleWritesUtils { } private static String getSampleWritesBasePath(JavaSparkContext jsc, HoodieWriteConfig writeConfig, String instantTime) throws IOException { - Path basePath = new CachingPath(writeConfig.getBasePath(), SAMPLE_WRITES_FOLDER_PATH + StoragePath.SEPARATOR + instantTime); - FileSystem fs = HadoopFSUtils.getFs(basePath, jsc.hadoopConfiguration()); - if (fs.exists(basePath)) { - fs.delete(basePath, true); + StoragePath basePath = new StoragePath(writeConfig.getBasePath(), SAMPLE_WRITES_FOLDER_PATH + StoragePath.SEPARATOR + instantTime); + HoodieStorage storage = getMetaClient(jsc, writeConfig.getBasePath()).getStorage(); + if (storage.exists(basePath)) { + storage.deleteDirectory(basePath); } return basePath.toString(); }
