This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 95cdfa010a9 [HUDI-7631] Clean up usage of CachingPath outside
hudi-common module (#11059)
95cdfa010a9 is described below
commit 95cdfa010a93c6c5f6d3b85263eb7f2433f9f8a6
Author: Vova Kolmakov <[email protected]>
AuthorDate: Sun Apr 21 11:58:48 2024 +0700
[HUDI-7631] Clean up usage of CachingPath outside hudi-common module
(#11059)
---
.../metadata/HoodieBackedTableMetadataWriter.java | 57 ++++++++--------------
.../hudi/client/utils/SparkPartitionUtils.java | 4 +-
.../hudi/io/storage/row/HoodieRowCreateHandle.java | 28 +++++------
.../scala/org/apache/hudi/HoodieSparkUtils.scala | 20 ++++----
.../java/org/apache/hudi/common/fs/FSUtils.java | 4 ++
.../view/HoodieTablePreCommitFileSystemView.java | 4 +-
.../scala/org/apache/hudi/HoodieBaseRelation.scala | 11 ++---
.../apache/hudi/SparkHoodieTableFileIndex.scala | 2 +-
.../utilities/HoodieMetadataTableValidator.java | 5 +-
.../utilities/streamer/SparkSampleWritesUtils.java | 15 +++---
10 files changed, 65 insertions(+), 85 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 1fa5877c353..f20643a9e95 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -69,9 +69,6 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.exception.TableNotFoundException;
-import org.apache.hudi.hadoop.fs.CachingPath;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
-import org.apache.hudi.hadoop.fs.SerializablePath;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
@@ -80,9 +77,6 @@ import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -669,15 +663,14 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
* @return List consisting of {@code DirectoryInfo} for each partition found.
*/
private List<DirectoryInfo> listAllPartitionsFromFilesystem(String
initializationTime) {
- List<SerializablePath> pathsToList = new LinkedList<>();
- pathsToList.add(new SerializablePath(new
CachingPath(dataWriteConfig.getBasePath())));
+ List<StoragePath> pathsToList = new LinkedList<>();
+ pathsToList.add(new StoragePath(dataWriteConfig.getBasePath()));
List<DirectoryInfo> partitionsToBootstrap = new LinkedList<>();
final int fileListingParallelism =
metadataWriteConfig.getFileListingParallelism();
- SerializableConfiguration conf = new
SerializableConfiguration(dataMetaClient.getHadoopConf());
final String dirFilterRegex =
dataWriteConfig.getMetadataConfig().getDirectoryFilterRegex();
final String datasetBasePath = dataMetaClient.getBasePathV2().toString();
- SerializablePath serializableBasePath = new SerializablePath(new
CachingPath(datasetBasePath));
+ StoragePath storageBasePath = new StoragePath(datasetBasePath);
while (!pathsToList.isEmpty()) {
// In each round we will list a section of directories
@@ -685,9 +678,8 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
// List all directories in parallel
engineContext.setJobStatus(this.getClass().getSimpleName(), "Listing " +
numDirsToList + " partitions from filesystem");
List<DirectoryInfo> processedDirectories =
engineContext.map(pathsToList.subList(0, numDirsToList), path -> {
- FileSystem fs = path.get().getFileSystem(conf.get());
- String relativeDirPath =
FSUtils.getRelativePartitionPath(serializableBasePath.get(), path.get());
- return new DirectoryInfo(relativeDirPath, fs.listStatus(path.get()),
initializationTime);
+ String relativeDirPath =
FSUtils.getRelativePartitionPath(storageBasePath, path);
+ return new DirectoryInfo(relativeDirPath,
metadataMetaClient.getStorage().listDirectEntries(path), initializationTime);
}, numDirsToList);
pathsToList = new LinkedList<>(pathsToList.subList(numDirsToList,
pathsToList.size()));
@@ -708,9 +700,7 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
partitionsToBootstrap.add(dirInfo);
} else {
// Add sub-dirs to the queue
- pathsToList.addAll(dirInfo.getSubDirectories().stream()
- .map(path -> new SerializablePath(new CachingPath(path.toUri())))
- .collect(Collectors.toList()));
+ pathsToList.addAll(dirInfo.getSubDirectories());
}
}
}
@@ -727,14 +717,9 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
private List<DirectoryInfo> listAllPartitionsFromMDT(String
initializationTime) throws IOException {
List<DirectoryInfo> dirinfoList = new LinkedList<>();
List<String> allPartitionPaths = metadata.getAllPartitionPaths().stream()
- .map(partitionPath -> dataWriteConfig.getBasePath() + "/" +
partitionPath).collect(Collectors.toList());
- Map<String, FileStatus[]> partitionFileMap =
metadata.getAllFilesInPartitions(allPartitionPaths)
- .entrySet()
- .stream()
- .collect(Collectors.toMap(e -> e.getKey(),
- e -> e.getValue().stream().map(status ->
HadoopFSUtils.convertToHadoopFileStatus(status))
- .toArray(FileStatus[]::new)));
- for (Map.Entry<String, FileStatus[]> entry : partitionFileMap.entrySet()) {
+ .map(partitionPath -> dataWriteConfig.getBasePath() +
StoragePath.SEPARATOR_CHAR + partitionPath).collect(Collectors.toList());
+ Map<String, List<StoragePathInfo>> partitionFileMap =
metadata.getAllFilesInPartitions(allPartitionPaths);
+ for (Map.Entry<String, List<StoragePathInfo>> entry :
partitionFileMap.entrySet()) {
dirinfoList.add(new DirectoryInfo(entry.getKey(), entry.getValue(),
initializationTime));
}
return dirinfoList;
@@ -1605,31 +1590,31 @@ public abstract class
HoodieBackedTableMetadataWriter<I> implements HoodieTableM
// Map of filenames within this partition to their respective sizes
private final HashMap<String, Long> filenameToSizeMap;
// List of directories within this partition
- private final List<Path> subDirectories = new ArrayList<>();
+ private final List<StoragePath> subDirectories = new ArrayList<>();
// Is this a hoodie partition
private boolean isHoodiePartition = false;
- public DirectoryInfo(String relativePath, FileStatus[] fileStatus, String
maxInstantTime) {
+ public DirectoryInfo(String relativePath, List<StoragePathInfo> pathInfos,
String maxInstantTime) {
this.relativePath = relativePath;
// Pre-allocate with the maximum length possible
- filenameToSizeMap = new HashMap<>(fileStatus.length);
+ filenameToSizeMap = new HashMap<>(pathInfos.size());
- for (FileStatus status : fileStatus) {
- if (status.isDirectory()) {
+ for (StoragePathInfo pathInfo : pathInfos) {
+ if (pathInfo.isDirectory()) {
// Ignore .hoodie directory as there cannot be any partitions inside
it
- if
(!status.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) {
- this.subDirectories.add(status.getPath());
+ if
(!pathInfo.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) {
+ this.subDirectories.add(pathInfo.getPath());
}
- } else if
(status.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX))
{
+ } else if
(pathInfo.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX))
{
// Presence of partition meta file implies this is a HUDI partition
this.isHoodiePartition = true;
- } else if (FSUtils.isDataFile(status.getPath())) {
+ } else if (FSUtils.isDataFile(pathInfo.getPath())) {
// Regular HUDI data file (base file or log file)
- String dataFileCommitTime =
FSUtils.getCommitTime(status.getPath().getName());
+ String dataFileCommitTime =
FSUtils.getCommitTime(pathInfo.getPath().getName());
// Limit the file listings to files which were created before the
maxInstant time.
if (HoodieTimeline.compareTimestamps(dataFileCommitTime,
LESSER_THAN_OR_EQUALS, maxInstantTime)) {
- filenameToSizeMap.put(status.getPath().getName(), status.getLen());
+ filenameToSizeMap.put(pathInfo.getPath().getName(),
pathInfo.getLength());
}
}
}
@@ -1647,7 +1632,7 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
return isHoodiePartition;
}
- List<Path> getSubDirectories() {
+ List<StoragePath> getSubDirectories() {
return subDirectories;
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkPartitionUtils.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkPartitionUtils.java
index d6545f247b6..e8db1b3515d 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkPartitionUtils.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkPartitionUtils.java
@@ -22,7 +22,7 @@ import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.hadoop.fs.CachingPath;
+import org.apache.hudi.storage.StoragePath;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
@@ -43,7 +43,7 @@ public class SparkPartitionUtils {
return HoodieSparkUtils.parsePartitionColumnValues(
partitionFields.get(),
partitionPath,
- new CachingPath(basePath),
+ new StoragePath(basePath),
AvroConversionUtils.convertAvroSchemaToStructType(writerSchema),
hadoopConf.get("timeZone", SQLConf.get().sessionLocalTimeZone()),
sparkParsePartitionUtil,
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
index 0d164f379fe..2a8c395d0d5 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
@@ -34,14 +34,11 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieInsertException;
-import org.apache.hudi.hadoop.fs.CachingPath;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.WriteMarkersFactory;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
@@ -67,7 +64,7 @@ public class HoodieRowCreateHandle implements Serializable {
private final HoodieWriteConfig writeConfig;
private final String partitionPath;
- private final Path path;
+ private final StoragePath path;
private final String fileId;
private final boolean populateMetaFields;
@@ -116,12 +113,11 @@ public class HoodieRowCreateHandle implements
Serializable {
this.currTimer = HoodieTimer.start();
HoodieStorage storage = table.getMetaClient().getStorage();
- FileSystem fs = (FileSystem) storage.getFileSystem();
String writeToken = getWriteToken(taskPartitionId, taskId, taskEpochId);
String fileName = FSUtils.makeBaseFileName(instantTime, writeToken,
this.fileId,
table.getBaseFileExtension());
- this.path = makeNewPath(fs, partitionPath, fileName, writeConfig);
+ this.path = makeNewPath(storage, partitionPath, fileName, writeConfig);
this.populateMetaFields = writeConfig.populateMetaFields();
this.fileName = UTF8String.fromString(path.getName());
@@ -147,13 +143,12 @@ public class HoodieRowCreateHandle implements
Serializable {
createMarkerFile(partitionPath, fileName, instantTime, table,
writeConfig);
- this.fileWriter =
HoodieInternalRowFileWriterFactory.getInternalRowFileWriter(
- new StoragePath(path.toUri()), table, writeConfig, structType);
+ this.fileWriter =
HoodieInternalRowFileWriterFactory.getInternalRowFileWriter(path, table,
writeConfig, structType);
} catch (IOException e) {
throw new HoodieInsertException("Failed to initialize file writer for
path " + path, e);
}
- LOG.info("New handle created for partition: " + partitionPath + " with
fileId " + fileId);
+ LOG.info("New handle created for partition: {} with fileId {}",
partitionPath, fileId);
}
/**
@@ -242,9 +237,8 @@ public class HoodieRowCreateHandle implements Serializable {
stat.setNumInserts(writeStatus.getTotalRecords());
stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);
stat.setFileId(fileId);
- stat.setPath(new StoragePath(writeConfig.getBasePath()), new
StoragePath(path.toUri()));
- long fileSizeInBytes =
FSUtils.getFileSize(table.getMetaClient().getStorage(),
- new StoragePath(path.toUri()));
+ stat.setPath(new StoragePath(writeConfig.getBasePath()), path);
+ long fileSizeInBytes =
FSUtils.getFileSize(table.getMetaClient().getStorage(), path);
stat.setTotalWriteBytes(fileSizeInBytes);
stat.setFileSizeInBytes(fileSizeInBytes);
stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords());
@@ -261,16 +255,16 @@ public class HoodieRowCreateHandle implements
Serializable {
return path.getName();
}
- private static Path makeNewPath(FileSystem fs, String partitionPath, String
fileName, HoodieWriteConfig writeConfig) {
- Path path =
FSUtils.constructAbsolutePathInHadoopPath(writeConfig.getBasePath(),
partitionPath);
+ private static StoragePath makeNewPath(HoodieStorage storage, String
partitionPath, String fileName, HoodieWriteConfig writeConfig) {
+ StoragePath path = new StoragePath(writeConfig.getBasePath(),
partitionPath);
try {
- if (!fs.exists(path)) {
- fs.mkdirs(path); // create a new partition as needed.
+ if (!storage.exists(path)) {
+ storage.createDirectory(path); // create a new partition as needed.
}
} catch (IOException e) {
throw new HoodieIOException("Failed to make dir " + path, e);
}
- return new CachingPath(path.toString(), fileName);
+ return new StoragePath(path, fileName);
}
/**
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
index 3393da6bd83..7febf2a2ced 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
@@ -25,7 +25,7 @@ import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils}
import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.hudi.common.model.HoodieRecord
-import org.apache.hudi.hadoop.fs.CachingPath
+import org.apache.hudi.storage.StoragePath
import org.apache.hudi.util.ExceptionWrappingIterator
import org.apache.spark.SPARK_VERSION
import org.apache.spark.internal.Logging
@@ -237,7 +237,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with
SparkVersionsSupport wi
def parsePartitionColumnValues(partitionColumns: Array[String],
partitionPath: String,
- basePath: Path,
+ basePath: StoragePath,
schema: StructType,
timeZoneId: String,
sparkParsePartitionUtil:
SparkParsePartitionUtil,
@@ -246,7 +246,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with
SparkVersionsSupport wi
// This is a non-partitioned table
Array.empty
} else {
- val partitionFragments = partitionPath.split("/")
+ val partitionFragments = partitionPath.split(StoragePath.SEPARATOR)
if (partitionFragments.length != partitionColumns.length) {
if (partitionColumns.length == 1) {
// If the partition column size is not equal to the partition
fragment size
@@ -290,9 +290,9 @@ object HoodieSparkUtils extends SparkAdapterSupport with
SparkVersionsSupport wi
} else {
partition
}
- }.mkString("/")
+ }.mkString(StoragePath.SEPARATOR)
- val pathWithPartitionName = new CachingPath(basePath,
CachingPath.createRelativePathUnsafe(partitionWithName))
+ val pathWithPartitionName = new StoragePath(basePath,
partitionWithName)
val partitionSchema = StructType(schema.fields.filter(f =>
partitionColumns.contains(f.name)))
val partitionValues = parsePartitionPath(pathWithPartitionName,
partitionSchema, timeZoneId,
sparkParsePartitionUtil, basePath, shouldValidatePartitionCols)
@@ -301,14 +301,14 @@ object HoodieSparkUtils extends SparkAdapterSupport with
SparkVersionsSupport wi
}
}
- private def parsePartitionPath(partitionPath: Path, partitionSchema:
StructType, timeZoneId: String,
- sparkParsePartitionUtil:
SparkParsePartitionUtil, basePath: Path,
+ private def parsePartitionPath(partitionPath: StoragePath, partitionSchema:
StructType, timeZoneId: String,
+ sparkParsePartitionUtil:
SparkParsePartitionUtil, basePath: StoragePath,
shouldValidatePartitionCols: Boolean):
Seq[Any] = {
val partitionDataTypes = partitionSchema.map(f => f.name ->
f.dataType).toMap
sparkParsePartitionUtil.parsePartition(
- partitionPath,
+ new Path(partitionPath.toUri),
typeInference = false,
- Set(basePath),
+ Set(new Path(basePath.toUri)),
partitionDataTypes,
getTimeZone(timeZoneId),
validatePartitionValues = shouldValidatePartitionCols
@@ -329,7 +329,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with
SparkVersionsSupport wi
partitionVals(index) = fragment.substring(fragment.indexOf("=") + 1)
} else {
- partitionVals(index) += "/" + fragment
+ partitionVals(index) += StoragePath.SEPARATOR + fragment
}
}
return partitionVals
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index 4eeee2d8ba5..afd28f0ebac 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -510,6 +510,10 @@ public class FSUtils {
return isBaseFile(path) || isLogFile(path);
}
+ public static boolean isDataFile(StoragePath path) {
+ return isBaseFile(path) || isLogFile(path);
+ }
+
/**
* Get the names of all the base and log files in the given partition path.
*/
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java
index afae30ca8e2..ea6b8f429bd 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java
@@ -21,7 +21,7 @@ package org.apache.hudi.common.table.view;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.hadoop.fs.CachingPath;
+import org.apache.hudi.storage.StoragePath;
import java.util.Collections;
import java.util.List;
@@ -71,7 +71,7 @@ public class HoodieTablePreCommitFileSystemView {
Map<String, HoodieBaseFile> newFilesWrittenForPartition =
filesWritten.stream()
.filter(file -> partitionStr.equals(file.getPartitionPath()))
.collect(Collectors.toMap(HoodieWriteStat::getFileId, writeStat ->
- new HoodieBaseFile(new CachingPath(tableMetaClient.getBasePath(),
writeStat.getPath()).toString(), writeStat.getFileId(), preCommitInstantTime,
null)));
+ new HoodieBaseFile(new StoragePath(tableMetaClient.getBasePath(),
writeStat.getPath()).toString(), writeStat.getFileId(), preCommitInstantTime,
null)));
Stream<HoodieBaseFile> committedBaseFiles =
this.completedCommitsFileSystemView.getLatestBaseFiles(partitionStr);
Map<String, HoodieBaseFile> allFileIds = committedBaseFiles
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index f8784192e41..b898a71c38b 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -39,7 +39,7 @@ import org.apache.hudi.common.util.{ConfigUtils, StringUtils}
import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
-import org.apache.hudi.hadoop.fs.{CachingPath, HadoopFSUtils}
+import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
@@ -68,7 +68,6 @@ import org.apache.spark.sql.sources.{BaseRelation, Filter,
PrunedFilteredScan}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SQLContext, SparkSession}
-import java.net.URI
import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}
@@ -492,14 +491,14 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
protected def getPartitionColumnsAsInternalRowInternal(file:
StoragePathInfo, basePath: Path,
extractPartitionValuesFromPartitionPath: Boolean): InternalRow = {
if (extractPartitionValuesFromPartitionPath) {
- val tablePathWithoutScheme =
CachingPath.getPathWithoutSchemeAndAuthority(basePath)
- val partitionPathWithoutScheme =
CachingPath.getPathWithoutSchemeAndAuthority(new
Path(file.getPath.getParent.toUri))
- val relativePath = new
URI(tablePathWithoutScheme.toString).relativize(new
URI(partitionPathWithoutScheme.toString)).toString
+ val tablePathWithoutScheme = new
StoragePath(basePath.toUri).getPathWithoutSchemeAndAuthority
+ val partitionPathWithoutScheme = new
StoragePath(file.getPath.getParent.toUri).getPathWithoutSchemeAndAuthority
+ val relativePath =
tablePathWithoutScheme.toUri.relativize(partitionPathWithoutScheme.toUri).toString
val timeZoneId = conf.get("timeZone",
sparkSession.sessionState.conf.sessionLocalTimeZone)
val rowValues = HoodieSparkUtils.parsePartitionColumnValues(
partitionColumns,
relativePath,
- basePath,
+ new StoragePath(basePath.toUri),
tableStructSchema,
timeZoneId,
sparkAdapter.getSparkParsePartitionUtil,
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
index 02b72de0cb9..c0d5864c73d 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
@@ -401,7 +401,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
}
protected def doParsePartitionColumnValues(partitionColumns: Array[String],
partitionPath: String): Array[Object] = {
- HoodieSparkUtils.parsePartitionColumnValues(partitionColumns,
partitionPath, new Path(getBasePath.toUri), schema,
+ HoodieSparkUtils.parsePartitionColumnValues(partitionColumns,
partitionPath, getBasePath, schema,
configProperties.getString(DateTimeUtils.TIMEZONE_OPTION,
SQLConf.get.sessionLocalTimeZone),
sparkParsePartitionUtil, shouldValidatePartitionColumns(spark))
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index b5b2d7c0485..75ff9a41fc0 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -103,7 +103,6 @@ import java.util.stream.Collectors;
import scala.Tuple2;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
-import static
org.apache.hudi.hadoop.fs.CachingPath.getPathWithoutSchemeAndAuthority;
import static
org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath;
/**
@@ -1244,8 +1243,8 @@ public class HoodieMetadataTableValidator implements
Serializable {
}
private String getRelativePath(String basePath, String absoluteFilePath) {
- String basePathStr = getPathWithoutSchemeAndAuthority(new
Path(basePath)).toString();
- String absoluteFilePathStr = getPathWithoutSchemeAndAuthority(new
Path(absoluteFilePath)).toString();
+ String basePathStr = new
StoragePath(basePath).getPathWithoutSchemeAndAuthority().toString();
+ String absoluteFilePathStr = new
StoragePath(absoluteFilePath).getPathWithoutSchemeAndAuthority().toString();
if (!absoluteFilePathStr.startsWith(basePathStr)) {
throw new IllegalArgumentException("File path does not belong to the
base path! basePath="
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SparkSampleWritesUtils.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SparkSampleWritesUtils.java
index e7dca04bbe7..01c2ab7ef11 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SparkSampleWritesUtils.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SparkSampleWritesUtils.java
@@ -32,12 +32,11 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.hadoop.fs.CachingPath;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
@@ -80,7 +79,7 @@ public class SparkSampleWritesUtils {
Pair<Boolean, String> result = doSampleWrites(jsc, recordsOpt,
writeConfig, instantTime);
if (result.getLeft()) {
long avgSize = getAvgSizeFromSampleWrites(jsc, result.getRight());
- LOG.info("Overwriting record size estimate to " + avgSize);
+ LOG.info("Overwriting record size estimate to {}", avgSize);
TypedProperties props = writeConfig.getProps();
props.put(COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(),
String.valueOf(avgSize));
return
Option.of(HoodieWriteConfig.newBuilder().withProperties(props).build());
@@ -121,7 +120,7 @@ public class SparkSampleWritesUtils {
sampleWriteClient.startCommitWithTime(instantTime);
JavaRDD<WriteStatus> writeStatusRDD =
sampleWriteClient.bulkInsert(jsc.parallelize(samples, 1), instantTime);
if (writeStatusRDD.filter(WriteStatus::hasErrors).count() > 0) {
- LOG.error(String.format("sample writes for table %s failed with
errors.", writeConfig.getTableName()));
+ LOG.error("sample writes for table {} failed with errors.",
writeConfig.getTableName());
if (LOG.isTraceEnabled()) {
LOG.trace("Printing out the top 100 errors");
writeStatusRDD.filter(WriteStatus::hasErrors).take(100).forEach(ws
-> {
@@ -139,10 +138,10 @@ public class SparkSampleWritesUtils {
}
private static String getSampleWritesBasePath(JavaSparkContext jsc,
HoodieWriteConfig writeConfig, String instantTime) throws IOException {
- Path basePath = new CachingPath(writeConfig.getBasePath(),
SAMPLE_WRITES_FOLDER_PATH + StoragePath.SEPARATOR + instantTime);
- FileSystem fs = HadoopFSUtils.getFs(basePath, jsc.hadoopConfiguration());
- if (fs.exists(basePath)) {
- fs.delete(basePath, true);
+ StoragePath basePath = new StoragePath(writeConfig.getBasePath(),
SAMPLE_WRITES_FOLDER_PATH + StoragePath.SEPARATOR + instantTime);
+ HoodieStorage storage = getMetaClient(jsc,
writeConfig.getBasePath()).getStorage();
+ if (storage.exists(basePath)) {
+ storage.deleteDirectory(basePath);
}
return basePath.toString();
}