This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 38409228614 [HUDI-6628] Rely on methods in HoodieBaseFile and
HoodieLogFile instead of FSUtils when possible (#9337)
38409228614 is described below
commit 38409228614ead671423092a2243d466fcce2c35
Author: Tim Brown <[email protected]>
AuthorDate: Fri Aug 4 08:45:22 2023 -0700
[HUDI-6628] Rely on methods in HoodieBaseFile and HoodieLogFile instead of
FSUtils when possible (#9337)
---
.../hudi/cli/commands/HoodieLogFileCommand.java | 5 +-
.../apache/hudi/client/CompactionAdminClient.java | 7 +-
.../apache/hudi/index/bloom/HoodieBloomIndex.java | 17 ++--
.../java/org/apache/hudi/io/HoodieMergeHandle.java | 4 +-
.../java/org/apache/hudi/io/HoodieWriteHandle.java | 2 +-
.../metadata/HoodieBackedTableMetadataWriter.java | 24 ++---
.../table/action/commit/JavaUpsertPartitioner.java | 3 +-
.../index/bloom/HoodieFileProbingFunction.java | 10 +-
.../HoodieMetadataBloomFilterProbingFunction.java | 17 ++--
.../table/action/commit/UpsertPartitioner.java | 3 +-
.../SparkUpsertDeltaCommitPartitioner.java | 12 +--
.../java/org/apache/hudi/common/fs/FSUtils.java | 17 +---
.../org/apache/hudi/common/model/BaseFile.java | 2 +-
.../hudi/common/model/CompactionOperation.java | 4 +-
.../apache/hudi/common/model/HoodieBaseFile.java | 46 +++++++++-
.../apache/hudi/common/model/HoodieLogFile.java | 101 ++++++++++++++++-----
.../table/log/AbstractHoodieLogRecordReader.java | 6 +-
.../hudi/common/table/log/HoodieLogFileReader.java | 4 +-
.../view/HoodieTablePreCommitFileSystemView.java | 5 +-
.../hudi/common/model/TestHoodieBaseFile.java | 81 +++++++++++++++++
.../hudi/common/model/TestHoodieLogFile.java | 92 +++++++++++++++++++
.../partitioner/profile/DeltaWriteProfile.java | 10 +-
.../sink/partitioner/profile/WriteProfile.java | 4 +-
.../apache/hudi/hadoop/realtime/RealtimeSplit.java | 8 +-
.../realtime/TestHoodieRealtimeFileSplit.java | 3 +-
.../utilities/HoodieMetadataTableValidator.java | 2 +-
26 files changed, 375 insertions(+), 114 deletions(-)
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
index 82f9c1a6468..cf36a704c7d 100644
---
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
+++
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
@@ -42,6 +42,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieMemoryConfig;
+import org.apache.hudi.hadoop.CachingPath;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.Schema;
@@ -232,9 +233,9 @@ public class HoodieLogFileCommand {
} else {
for (String logFile : logFilePaths) {
Schema writerSchema = new AvroSchemaConverter()
-
.convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(client.getFs(),
new Path(logFile))));
+
.convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(client.getFs(),
new CachingPath(logFile))));
HoodieLogFormat.Reader reader =
- HoodieLogFormat.newReader(fs, new HoodieLogFile(new
Path(logFile)), writerSchema);
+ HoodieLogFormat.newReader(fs, new HoodieLogFile(new
CachingPath(logFile)), writerSchema);
// read the avro blocks
while (reader.hasNext()) {
HoodieLogBlock n = reader.next();
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java
index 8ff562c2070..257d2cd855c 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java
@@ -41,6 +41,7 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.hadoop.CachingPath;
import org.apache.hudi.table.action.compact.OperationResult;
import org.apache.hadoop.fs.FileStatus;
@@ -244,8 +245,8 @@ public class CompactionAdminClient extends BaseHoodieClient
{
merged.getLogFiles().filter(lf -> lf.getLogVersion() >
maxVersion).collect(Collectors.toList());
return logFilesToBeMoved.stream().map(lf -> {
ValidationUtils.checkArgument(lf.getLogVersion() - maxVersion > 0,
"Expect new log version to be sane");
- HoodieLogFile newLogFile = new HoodieLogFile(new
Path(lf.getPath().getParent(),
- FSUtils.makeLogFileName(lf.getFileId(), "." +
FSUtils.getFileExtensionFromLog(lf.getPath()),
+ HoodieLogFile newLogFile = new HoodieLogFile(new
CachingPath(lf.getPath().getParent(),
+ FSUtils.makeLogFileName(lf.getFileId(), "." + lf.getFileExtension(),
compactionInstant, lf.getLogVersion() - maxVersion,
HoodieLogFormat.UNKNOWN_WRITE_TOKEN)));
return Pair.of(lf, newLogFile);
}).collect(Collectors.toList());
@@ -450,7 +451,7 @@ public class CompactionAdminClient extends BaseHoodieClient
{
.orElse(fileSliceForCompaction.getLogFiles().findFirst().map(lf ->
lf.getPath().getParent().toString()).get());
for (HoodieLogFile toRepair : logFilesToRepair) {
int version = maxUsedVersion + 1;
- HoodieLogFile newLf = new HoodieLogFile(new Path(parentPath,
FSUtils.makeLogFileName(operation.getFileId(),
+ HoodieLogFile newLf = new HoodieLogFile(new CachingPath(parentPath,
FSUtils.makeLogFileName(operation.getFileId(),
logExtn, operation.getBaseInstantTime(), version,
HoodieLogFormat.UNKNOWN_WRITE_TOKEN)));
result.add(Pair.of(toRepair, newLf));
maxUsedVersion = version;
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
index fcd7135833f..ab7ccd1b49b 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
@@ -25,7 +25,6 @@ import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
@@ -47,6 +46,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
@@ -214,12 +214,15 @@ public class HoodieBloomIndex extends HoodieIndex<Object,
Object> {
String keyField =
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
+ List<Pair<String, HoodieBaseFile>> baseFilesForAllPartitions =
HoodieIndexUtils.getLatestBaseFilesForAllPartitions(partitions, context,
hoodieTable);
// Partition and file name pairs
- List<Pair<String, String>> partitionFileNameList =
- HoodieIndexUtils.getLatestBaseFilesForAllPartitions(partitions,
context, hoodieTable).stream()
- .map(partitionBaseFilePair ->
Pair.of(partitionBaseFilePair.getLeft(),
partitionBaseFilePair.getRight().getFileName()))
- .sorted()
- .collect(toList());
+ List<Pair<String, String>> partitionFileNameList = new
ArrayList<>(baseFilesForAllPartitions.size());
+ Map<Pair<String, String>, String> partitionAndFileNameToFileId = new
HashMap<>(baseFilesForAllPartitions.size(), 1);
+ baseFilesForAllPartitions.forEach(pair -> {
+ Pair<String, String> partitionAndFileName = Pair.of(pair.getKey(),
pair.getValue().getFileName());
+ partitionFileNameList.add(partitionAndFileName);
+ partitionAndFileNameToFileId.put(partitionAndFileName,
pair.getValue().getFileId());
+ });
if (partitionFileNameList.isEmpty()) {
return Collections.emptyList();
@@ -233,7 +236,7 @@ public class HoodieBloomIndex extends HoodieIndex<Object,
Object> {
for (Map.Entry<Pair<String, String>, HoodieMetadataColumnStats> entry :
fileToColumnStatsMap.entrySet()) {
result.add(Pair.of(entry.getKey().getLeft(),
new BloomIndexFileInfo(
- FSUtils.getFileId(entry.getKey().getRight()),
+ partitionAndFileNameToFileId.get(entry.getKey()),
// NOTE: Here we assume that the type of the primary key field
is string
(String) unwrapAvroValueWrapper(entry.getValue().getMinValue()),
(String) unwrapAvroValueWrapper(entry.getValue().getMaxValue())
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 7aa357b7504..21c0059474e 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -173,7 +173,7 @@ public class HoodieMergeHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O>
writeStatus.setStat(new HoodieWriteStat());
try {
String latestValidFilePath = baseFileToMerge.getFileName();
-
writeStatus.getStat().setPrevCommit(FSUtils.getCommitTime(latestValidFilePath));
+ writeStatus.getStat().setPrevCommit(baseFileToMerge.getCommitTime());
HoodiePartitionMetadata partitionMetadata = new
HoodiePartitionMetadata(fs, instantTime,
new Path(config.getBasePath()),
FSUtils.getPartitionPath(config.getBasePath(), partitionPath),
@@ -471,7 +471,7 @@ public class HoodieMergeHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O>
String.format("Record write count decreased for file: %s, Partition
Path: %s (%s:%d + %d < %s:%d)",
writeStatus.getFileId(), writeStatus.getPartitionPath(),
instantTime, writeStatus.getStat().getNumWrites(),
writeStatus.getStat().getNumDeletes(),
- FSUtils.getCommitTime(oldFilePath.toString()), oldNumWrites));
+ baseFileToMerge.getCommitTime(), oldNumWrites));
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index 9e716a280e8..81480767599 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -253,7 +253,7 @@ public abstract class HoodieWriteHandle<T, I, K, O> extends
HoodieIOHandle<T, I,
.withSizeThreshold(config.getLogFileMaxSize())
.withFs(fs)
.withRolloverLogWriteToken(writeToken)
- .withLogWriteToken(latestLogFile.map(x ->
FSUtils.getWriteTokenFromLogPath(x.getPath())).orElse(writeToken))
+
.withLogWriteToken(latestLogFile.map(HoodieLogFile::getLogWriteToken).orElse(writeToken))
.withSuffix(suffix)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 07aea4d934f..53ad933197e 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -35,6 +35,7 @@ import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
@@ -164,7 +165,7 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
this.engineContext = engineContext;
this.hadoopConf = new SerializableConfiguration(hadoopConf);
this.metrics = Option.empty();
- this.enabledPartitionTypes = new ArrayList<>();
+ this.enabledPartitionTypes = new ArrayList<>(4);
this.dataMetaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(dataWriteConfig.getBasePath()).build();
@@ -481,10 +482,10 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
// Collect the list of latest base files present in each partition
List<String> partitions = metadata.getAllPartitionPaths();
fsView.loadAllPartitions();
- final List<Pair<String, String>> partitionBaseFilePairs = new
ArrayList<>();
+ final List<Pair<String, HoodieBaseFile>> partitionBaseFilePairs = new
ArrayList<>();
for (String partition : partitions) {
partitionBaseFilePairs.addAll(fsView.getLatestBaseFiles(partition)
- .map(basefile -> Pair.of(partition,
basefile.getFileName())).collect(Collectors.toList()));
+ .map(basefile -> Pair.of(partition,
basefile)).collect(Collectors.toList()));
}
LOG.info("Initializing record index from " + partitionBaseFilePairs.size()
+ " base files in "
@@ -509,7 +510,7 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
* Read the record keys from base files in partitions and return records.
*/
private HoodieData<HoodieRecord>
readRecordKeysFromBaseFiles(HoodieEngineContext engineContext,
-
List<Pair<String, String>> partitionBaseFilePairs,
+
List<Pair<String, HoodieBaseFile>> partitionBaseFilePairs,
boolean
forDelete) {
if (partitionBaseFilePairs.isEmpty()) {
return engineContext.emptyHoodieData();
@@ -517,13 +518,14 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
engineContext.setJobStatus(this.getClass().getSimpleName(), "Record Index:
reading record keys from " + partitionBaseFilePairs.size() + " base files");
final int parallelism = Math.min(partitionBaseFilePairs.size(),
dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
- return engineContext.parallelize(partitionBaseFilePairs,
parallelism).flatMap(p -> {
- final String partition = p.getKey();
- final String filename = p.getValue();
+ return engineContext.parallelize(partitionBaseFilePairs,
parallelism).flatMap(partitionAndBaseFile -> {
+ final String partition = partitionAndBaseFile.getKey();
+ final HoodieBaseFile baseFile = partitionAndBaseFile.getValue();
+ final String filename = baseFile.getFileName();
Path dataFilePath = new Path(dataWriteConfig.getBasePath(), partition +
Path.SEPARATOR + filename);
- final String fileId = FSUtils.getFileId(filename);
- final String instantTime = FSUtils.getCommitTime(filename);
+ final String fileId = baseFile.getFileId();
+ final String instantTime = baseFile.getCommitTime();
HoodieFileReader reader =
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(hadoopConf.get(),
dataFilePath);
ClosableIterator<String> recordKeyIterator =
reader.getRecordKeyIterator();
@@ -1370,10 +1372,10 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
private HoodieData<HoodieRecord>
getRecordIndexReplacedRecords(HoodieReplaceCommitMetadata
replaceCommitMetadata) {
final HoodieMetadataFileSystemView fsView = new
HoodieMetadataFileSystemView(dataMetaClient,
dataMetaClient.getActiveTimeline(), metadata);
- List<Pair<String, String>> partitionBaseFilePairs = replaceCommitMetadata
+ List<Pair<String, HoodieBaseFile>> partitionBaseFilePairs =
replaceCommitMetadata
.getPartitionToReplaceFileIds()
.keySet().stream().flatMap(partition
- -> fsView.getLatestBaseFiles(partition).map(f ->
Pair.of(partition, f.getFileName())))
+ -> fsView.getLatestBaseFiles(partition).map(f ->
Pair.of(partition, f)))
.collect(Collectors.toList());
return readRecordKeysFromBaseFiles(engineContext, partitionBaseFilePairs,
true);
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java
index 1c8ba1b175d..ad7111d70a2 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java
@@ -253,9 +253,8 @@ public class JavaUpsertPartitioner<T> implements
Partitioner {
for (HoodieBaseFile file : allFiles) {
if (file.getFileSize() < config.getParquetSmallFileLimit()) {
- String filename = file.getFileName();
SmallFile sf = new SmallFile();
- sf.location = new
HoodieRecordLocation(FSUtils.getCommitTime(filename),
FSUtils.getFileId(filename));
+ sf.location = new HoodieRecordLocation(file.getCommitTime(),
file.getFileId());
sf.sizeBytes = file.getFileSize();
smallFileLocations.add(sf);
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java
index 43d3a8a8e2e..2b6a96b3d05 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java
@@ -20,7 +20,6 @@ package org.apache.hudi.index.bloom;
import org.apache.hudi.client.utils.LazyIterableIterator;
import org.apache.hudi.common.config.SerializableConfiguration;
-import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
@@ -84,7 +83,7 @@ public class HoodieFileProbingFunction implements
@Override
protected List<HoodieKeyLookupResult> computeNext() {
// Partition path and file name pair to list of keys
- final Map<Pair<String, String>, HoodieBloomFilterProbingResult>
fileToLookupResults = new HashMap<>();
+ final Map<Pair<String, HoodieBaseFile>, HoodieBloomFilterProbingResult>
fileToLookupResults = new HashMap<>();
final Map<String, HoodieBaseFile> fileIDBaseFileMap = new HashMap<>();
while (inputItr.hasNext()) {
@@ -103,7 +102,7 @@ public class HoodieFileProbingFunction implements
fileIDBaseFileMap.put(fileId, baseFile.get());
}
- fileToLookupResults.putIfAbsent(Pair.of(partitionPath,
fileIDBaseFileMap.get(fileId).getFileName()), entry._2);
+ fileToLookupResults.putIfAbsent(Pair.of(partitionPath,
fileIDBaseFileMap.get(fileId)), entry._2);
if (fileToLookupResults.size() >
BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH) {
break;
@@ -116,12 +115,11 @@ public class HoodieFileProbingFunction implements
return fileToLookupResults.entrySet().stream()
.map(entry -> {
- Pair<String, String> partitionPathFileNamePair = entry.getKey();
+ Pair<String, HoodieBaseFile> partitionPathFileNamePair =
entry.getKey();
HoodieBloomFilterProbingResult bloomFilterKeyLookupResult =
entry.getValue();
final String partitionPath = partitionPathFileNamePair.getLeft();
- final String fileName = partitionPathFileNamePair.getRight();
- final String fileId = FSUtils.getFileId(fileName);
+ final String fileId =
partitionPathFileNamePair.getRight().getFileId();
ValidationUtils.checkState(!fileId.isEmpty());
List<String> candidateRecordKeys =
bloomFilterKeyLookupResult.getCandidateKeys();
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomFilterProbingFunction.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomFilterProbingFunction.java
index 5dacad7320e..c96bd8740fe 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomFilterProbingFunction.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomFilterProbingFunction.java
@@ -20,7 +20,6 @@ package org.apache.hudi.index.bloom;
import org.apache.hudi.client.utils.LazyIterableIterator;
import org.apache.hudi.common.bloom.BloomFilter;
-import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
@@ -44,6 +43,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import scala.Tuple2;
@@ -93,7 +93,7 @@ public class HoodieMetadataBloomFilterProbingFunction
implements
@Override
protected Iterator<Tuple2<HoodieFileGroupId,
HoodieBloomFilterProbingResult>> computeNext() {
// Partition path and file name pair to list of keys
- final Map<Pair<String, String>, List<HoodieKey>> fileToKeysMap = new
HashMap<>();
+ final Map<Pair<String, HoodieBaseFile>, List<HoodieKey>> fileToKeysMap =
new HashMap<>();
final Map<String, HoodieBaseFile> fileIDBaseFileMap = new HashMap<>();
while (inputItr.hasNext()) {
@@ -110,7 +110,7 @@ public class HoodieMetadataBloomFilterProbingFunction
implements
fileIDBaseFileMap.put(fileId, baseFile.get());
}
- fileToKeysMap.computeIfAbsent(Pair.of(partitionPath,
fileIDBaseFileMap.get(fileId).getFileName()),
+ fileToKeysMap.computeIfAbsent(Pair.of(partitionPath,
fileIDBaseFileMap.get(fileId)),
k -> new ArrayList<>()).add(new HoodieKey(entry._2,
partitionPath));
if (fileToKeysMap.size() >
BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH) {
@@ -122,20 +122,19 @@ public class HoodieMetadataBloomFilterProbingFunction
implements
return Collections.emptyIterator();
}
- List<Pair<String, String>> partitionNameFileNameList = new
ArrayList<>(fileToKeysMap.keySet());
+ List<Pair<String, String>> partitionNameFileNameList =
fileToKeysMap.keySet().stream().map(pair -> Pair.of(pair.getLeft(),
pair.getRight().getFileName())).collect(Collectors.toList());
Map<Pair<String, String>, BloomFilter> fileToBloomFilterMap =
hoodieTable.getMetadataTable().getBloomFilters(partitionNameFileNameList);
return fileToKeysMap.entrySet().stream()
.map(entry -> {
- Pair<String, String> partitionPathFileNamePair = entry.getKey();
List<HoodieKey> hoodieKeyList = entry.getValue();
-
- final String partitionPath = partitionPathFileNamePair.getLeft();
- final String fileName = partitionPathFileNamePair.getRight();
- final String fileId = FSUtils.getFileId(fileName);
+ final String partitionPath = entry.getKey().getLeft();
+ final HoodieBaseFile baseFile = entry.getKey().getRight();
+ final String fileId = baseFile.getFileId();
ValidationUtils.checkState(!fileId.isEmpty());
+ Pair<String, String> partitionPathFileNamePair =
Pair.of(partitionPath, baseFile.getFileName());
if (!fileToBloomFilterMap.containsKey(partitionPathFileNamePair)) {
throw new HoodieIndexException("Failed to get the bloom filter
for " + partitionPathFileNamePair);
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
index d98524fc38f..edd6d981d18 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
@@ -302,9 +302,8 @@ public class UpsertPartitioner<T> extends
SparkHoodiePartitioner<T> {
for (HoodieBaseFile file : allFiles) {
if (file.getFileSize() < config.getParquetSmallFileLimit()) {
- String filename = file.getFileName();
SmallFile sf = new SmallFile();
- sf.location = new
HoodieRecordLocation(FSUtils.getCommitTime(filename),
FSUtils.getFileId(filename));
+ sf.location = new HoodieRecordLocation(file.getCommitTime(),
file.getFileId());
sf.sizeBytes = file.getFileSize();
smallFileLocations.add(sf);
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java
index a5dd9978939..dc8f267718f 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java
@@ -19,8 +19,8 @@
package org.apache.hudi.table.action.deltacommit;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -32,6 +32,7 @@ import org.apache.hudi.table.action.commit.SmallFile;
import org.apache.hudi.table.action.commit.UpsertPartitioner;
import javax.annotation.Nonnull;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -69,15 +70,14 @@ public class SparkUpsertDeltaCommitPartitioner<T> extends
UpsertPartitioner<T> {
for (FileSlice smallFileSlice : smallFileSlicesCandidates) {
SmallFile sf = new SmallFile();
if (smallFileSlice.getBaseFile().isPresent()) {
- // TODO : Move logic of file name, file id, base commit time handling
inside file slice
- String filename = smallFileSlice.getBaseFile().get().getFileName();
- sf.location = new
HoodieRecordLocation(FSUtils.getCommitTime(filename),
FSUtils.getFileId(filename));
+ HoodieBaseFile baseFile = smallFileSlice.getBaseFile().get();
+ sf.location = new HoodieRecordLocation(baseFile.getCommitTime(),
baseFile.getFileId());
sf.sizeBytes = getTotalFileSize(smallFileSlice);
smallFileLocations.add(sf);
} else {
HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get();
- sf.location = new
HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
- FSUtils.getFileIdFromLogPath(logFile.getPath()));
+ sf.location = new HoodieRecordLocation(logFile.getBaseCommitTime(),
+ logFile.getFileId());
sf.sizeBytes = getTotalFileSize(smallFileSlice);
smallFileLocations.add(sf);
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index 234af8a7be3..455f80af497 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -193,9 +193,9 @@ public class FSUtils {
public static String getCommitTime(String fullFileName) {
try {
if (isLogFile(fullFileName)) {
- return fullFileName.split("_")[1].split("\\.")[0];
+ return fullFileName.split("_")[1].split("\\.", 2)[0];
}
- return fullFileName.split("_")[2].split("\\.")[0];
+ return fullFileName.split("_")[2].split("\\.", 2)[0];
} catch (ArrayIndexOutOfBoundsException e) {
throw new HoodieException("Failed to get commit time from filename: " +
fullFileName, e);
}
@@ -206,7 +206,7 @@ public class FSUtils {
}
public static String getFileId(String fullFileName) {
- return fullFileName.split("_")[0];
+ return fullFileName.split("_", 2)[0];
}
/**
@@ -454,15 +454,6 @@ public class FSUtils {
return Integer.parseInt(matcher.group(4));
}
- public static String getSuffixFromLogPath(Path path) {
- Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
- if (!matcher.find()) {
- throw new InvalidHoodiePathException(path, "LogFile");
- }
- String val = matcher.group(10);
- return val == null ? "" : val;
- }
-
public static String makeLogFileName(String fileId, String logFileExtension,
String baseCommitTime, int version,
String writeToken) {
String suffix = (writeToken == null)
@@ -547,7 +538,7 @@ public class FSUtils {
getLatestLogFile(getAllLogFiles(fs, partitionPath, fileId,
logFileExtension, baseCommitTime));
if (latestLogFile.isPresent()) {
return Option
- .of(Pair.of(latestLogFile.get().getLogVersion(),
getWriteTokenFromLogPath(latestLogFile.get().getPath())));
+ .of(Pair.of(latestLogFile.get().getLogVersion(),
latestLogFile.get().getLogWriteToken()));
}
return Option.empty();
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseFile.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseFile.java
index 7c4c8977795..cfca6c50c75 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseFile.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseFile.java
@@ -36,7 +36,7 @@ public class BaseFile implements Serializable {
private transient FileStatus fileStatus;
private final String fullPath;
- private final String fileName;
+ protected final String fileName;
private long fileLen;
public BaseFile(BaseFile dataFile) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/CompactionOperation.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/CompactionOperation.java
index 14f8f59b3da..861271b0630 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/CompactionOperation.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/CompactionOperation.java
@@ -73,8 +73,8 @@ public class CompactionOperation implements Serializable {
} else {
assert logFiles.size() > 0;
this.dataFileName = Option.empty();
- this.baseInstantTime =
FSUtils.getBaseCommitTimeFromLogPath(logFiles.get(0).getPath());
- this.id = new HoodieFileGroupId(partitionPath,
FSUtils.getFileIdFromLogPath(logFiles.get(0).getPath()));
+ this.baseInstantTime = logFiles.get(0).getBaseCommitTime();
+ this.id = new HoodieFileGroupId(partitionPath,
logFiles.get(0).getFileId());
this.dataFileCommitTime = Option.empty();
this.bootstrapFilePath = Option.empty();
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieBaseFile.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieBaseFile.java
index 2c640bb1432..ed1c32698eb 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieBaseFile.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieBaseFile.java
@@ -18,7 +18,6 @@
package org.apache.hudi.common.model;
-import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hadoop.fs.FileStatus;
@@ -28,12 +27,19 @@ import org.apache.hadoop.fs.FileStatus;
* Supports APIs to get Hudi FileId, Commit Time and bootstrap file (if any).
*/
public class HoodieBaseFile extends BaseFile {
+ private static final long serialVersionUID = 1L;
+ private static final char UNDERSCORE = '_';
+ private static final char DOT = '.';
+ private final String fileId;
+ private final String commitTime;
private Option<BaseFile> bootstrapBaseFile;
public HoodieBaseFile(HoodieBaseFile dataFile) {
super(dataFile);
this.bootstrapBaseFile = dataFile.bootstrapBaseFile;
+ this.fileId = dataFile.getFileId();
+ this.commitTime = dataFile.getCommitTime();
}
public HoodieBaseFile(FileStatus fileStatus) {
@@ -43,6 +49,9 @@ public class HoodieBaseFile extends BaseFile {
public HoodieBaseFile(FileStatus fileStatus, BaseFile bootstrapBaseFile) {
super(fileStatus);
this.bootstrapBaseFile = Option.ofNullable(bootstrapBaseFile);
+ String[] fileIdAndCommitTime = getFileIdAndCommitTimeFromFileName();
+ this.fileId = fileIdAndCommitTime[0];
+ this.commitTime = fileIdAndCommitTime[1];
}
public HoodieBaseFile(String filePath) {
@@ -52,14 +61,45 @@ public class HoodieBaseFile extends BaseFile {
public HoodieBaseFile(String filePath, BaseFile bootstrapBaseFile) {
super(filePath);
this.bootstrapBaseFile = Option.ofNullable(bootstrapBaseFile);
+ String[] fileIdAndCommitTime = getFileIdAndCommitTimeFromFileName();
+ this.fileId = fileIdAndCommitTime[0];
+ this.commitTime = fileIdAndCommitTime[1];
+ }
+
+ /**
+ * Parses the file ID and commit time from the fileName.
+ * @return String array of size 2 with fileId as the first and commitTime as
the second element.
+ */
+ private String[] getFileIdAndCommitTimeFromFileName() {
+ String[] values = new String[2];
+ short underscoreCount = 0;
+ short lastUnderscoreIndex = 0;
+ for (int i = 0; i < fileName.length(); i++) {
+ char c = fileName.charAt(i);
+ if (c == UNDERSCORE) {
+ if (underscoreCount == 0) {
+ values[0] = fileName.substring(0, i);
+ }
+ lastUnderscoreIndex = (short) i;
+ underscoreCount++;
+ } else if (c == DOT) {
+ if (underscoreCount == 2) {
+ values[1] = fileName.substring(lastUnderscoreIndex + 1, i);
+ return values;
+ }
+ }
+ }
+ // case where there is no '.' in file name (no file suffix like .parquet)
+ values[1] = fileName.substring(lastUnderscoreIndex + 1);
+ return values;
}
public String getFileId() {
- return FSUtils.getFileId(getFileName());
+ return fileId;
}
public String getCommitTime() {
- return FSUtils.getCommitTime(getFileName());
+ return commitTime;
}
public Option<BaseFile> getBootstrapBaseFile() {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java
index 988194964f7..ecfbd925dd1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java
@@ -19,6 +19,8 @@
package org.apache.hudi.common.model;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.exception.InvalidHoodiePathException;
+import org.apache.hudi.hadoop.CachingPath;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -28,6 +30,9 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.Comparator;
import java.util.Objects;
+import java.util.regex.Matcher;
+
+import static org.apache.hudi.common.fs.FSUtils.LOG_FILE_PATTERN;
/**
* Abstracts a single log file. Contains methods to extract metadata like the
fileId, version and extension from the log
@@ -46,65 +51,115 @@ public class HoodieLogFile implements Serializable {
private static final Comparator<HoodieLogFile> LOG_FILE_COMPARATOR_REVERSED
= new LogFileComparator().reversed();
private transient FileStatus fileStatus;
+ private transient Path path;
private final String pathStr;
+ private String fileId;
+ private String baseCommitTime;
+ private int logVersion;
+ private String logWriteToken;
+ private String fileExtension;
+ private String suffix;
private long fileLen;
public HoodieLogFile(HoodieLogFile logFile) {
- this.fileStatus = logFile.fileStatus;
+ this.fileStatus = logFile.getFileStatus();
+ this.path = logFile.getPath();
this.pathStr = logFile.pathStr;
- this.fileLen = logFile.fileLen;
+ this.fileId = logFile.getFileId();
+ this.baseCommitTime = logFile.getBaseCommitTime();
+ this.logVersion = logFile.getLogVersion();
+ this.logWriteToken = logFile.getLogWriteToken();
+ this.fileExtension = logFile.getFileExtension();
+ this.suffix = logFile.getSuffix();
+ this.fileLen = logFile.getFileSize();
}
public HoodieLogFile(FileStatus fileStatus) {
- this.fileStatus = fileStatus;
- this.pathStr = fileStatus.getPath().toString();
- this.fileLen = fileStatus.getLen();
+ this(fileStatus, fileStatus.getPath(), fileStatus.getPath().toString(),
fileStatus.getLen());
}
public HoodieLogFile(Path logPath) {
- this.fileStatus = null;
- this.pathStr = logPath.toString();
- this.fileLen = -1;
+ this(null, logPath, logPath.toString(), -1);
}
- public HoodieLogFile(Path logPath, Long fileLen) {
- this.fileStatus = null;
- this.pathStr = logPath.toString();
- this.fileLen = fileLen;
+ public HoodieLogFile(Path logPath, long fileLen) {
+ this(null, logPath, logPath.toString(), fileLen);
}
public HoodieLogFile(String logPathStr) {
- this.fileStatus = null;
+ this(null, null, logPathStr, -1);
+ }
+
+ private HoodieLogFile(FileStatus fileStatus, Path logPath, String
logPathStr, long fileLen) {
+ this.fileStatus = fileStatus;
this.pathStr = logPathStr;
- this.fileLen = -1;
+ this.fileLen = fileLen;
+ this.logVersion = -1; // mark version as uninitialized
+ if (logPath instanceof CachingPath) {
+ this.path = logPath;
+ }
+ }
+
+ private void parseFieldsFromPath() {
+ Matcher matcher = LOG_FILE_PATTERN.matcher(getPath().getName());
+ if (!matcher.find()) {
+ throw new InvalidHoodiePathException(path, "LogFile");
+ }
+ this.fileId = matcher.group(1);
+ this.baseCommitTime = matcher.group(2);
+ this.fileExtension = matcher.group(3);
+ this.logVersion = Integer.parseInt(matcher.group(4));
+ this.logWriteToken = matcher.group(6);
+ this.suffix = matcher.group(10) == null ? "" : matcher.group(10);
}
public String getFileId() {
- return FSUtils.getFileIdFromLogPath(getPath());
+ if (fileId == null) {
+ parseFieldsFromPath();
+ }
+ return fileId;
}
public String getBaseCommitTime() {
- return FSUtils.getBaseCommitTimeFromLogPath(getPath());
+ if (baseCommitTime == null) {
+ parseFieldsFromPath();
+ }
+ return baseCommitTime;
}
public int getLogVersion() {
- return FSUtils.getFileVersionFromLog(getPath());
+ if (logVersion == -1) {
+ parseFieldsFromPath();
+ }
+ return logVersion;
}
public String getLogWriteToken() {
- return FSUtils.getWriteTokenFromLogPath(getPath());
+ if (logWriteToken == null) {
+ parseFieldsFromPath();
+ }
+ return logWriteToken;
}
public String getFileExtension() {
- return FSUtils.getFileExtensionFromLog(getPath());
+ if (fileExtension == null) {
+ parseFieldsFromPath();
+ }
+ return fileExtension;
}
public String getSuffix() {
- return FSUtils.getSuffixFromLogPath(getPath());
+ if (suffix == null) {
+ parseFieldsFromPath();
+ }
+ return suffix;
}
public Path getPath() {
- return new Path(pathStr);
+ if (path == null) {
+ path = new CachingPath(pathStr);
+ }
+ return path;
}
public String getFileName() {
@@ -131,9 +186,9 @@ public class HoodieLogFile implements Serializable {
String fileId = getFileId();
String baseCommitTime = getBaseCommitTime();
Path path = getPath();
- String extension = "." + FSUtils.getFileExtensionFromLog(path);
+ String extension = "." + fileExtension;
int newVersion = FSUtils.computeNextLogVersion(fs, path.getParent(),
fileId, extension, baseCommitTime);
- return new HoodieLogFile(new Path(path.getParent(),
+ return new HoodieLogFile(new CachingPath(path.getParent(),
FSUtils.makeLogFileName(fileId, extension, baseCommitTime, newVersion,
logWriteToken)));
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index 6ef1a6f5542..7b1e737610b 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -39,13 +39,13 @@ import
org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.hadoop.CachingPath;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.action.InternalSchemaMerger;
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -237,7 +237,7 @@ public abstract class AbstractHoodieLogRecordReader {
try {
// Iterate over the paths
logFormatReaderWrapper = new HoodieLogFormatReader(fs,
- logFilePaths.stream().map(logFile -> new HoodieLogFile(new
Path(logFile))).collect(Collectors.toList()),
+ logFilePaths.stream().map(logFile -> new HoodieLogFile(new
CachingPath(logFile))).collect(Collectors.toList()),
readerSchema, readBlocksLazily, reverseReader, bufferSize,
shouldLookupRecords(), recordKeyField, internalSchema);
Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
@@ -396,7 +396,7 @@ public abstract class AbstractHoodieLogRecordReader {
try {
// Iterate over the paths
logFormatReaderWrapper = new HoodieLogFormatReader(fs,
- logFilePaths.stream().map(logFile -> new HoodieLogFile(new
Path(logFile))).collect(Collectors.toList()),
+ logFilePaths.stream().map(logFile -> new HoodieLogFile(new
CachingPath(logFile))).collect(Collectors.toList()),
readerSchema, readBlocksLazily, reverseReader, bufferSize,
shouldLookupRecords(), recordKeyField, internalSchema);
/**
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
index f268accb706..6759650af78 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.fs.BufferedFSInputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -114,7 +115,8 @@ public class HoodieLogFileReader implements
HoodieLogFormat.Reader {
// NOTE: We repackage {@code HoodieLogFile} here to make sure that the
provided path
// is prefixed with an appropriate scheme given that we're not
propagating the FS
// further
- this.logFile = new HoodieLogFile(FSUtils.makeQualified(fs,
logFile.getPath()), logFile.getFileSize());
+ Path updatedPath = FSUtils.makeQualified(fs, logFile.getPath());
+ this.logFile = updatedPath.equals(logFile.getPath()) ? logFile : new
HoodieLogFile(updatedPath, logFile.getFileSize());
this.inputStream = getFSDataInputStream(fs, this.logFile, bufferSize);
this.readerSchema = readerSchema;
this.readBlockLazily = readBlockLazily;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java
index b50b74e65df..1ff93327f7f 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java
@@ -21,8 +21,7 @@ package org.apache.hudi.common.table.view;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-
-import org.apache.hadoop.fs.Path;
+import org.apache.hudi.hadoop.CachingPath;
import java.util.Collections;
import java.util.List;
@@ -72,7 +71,7 @@ public class HoodieTablePreCommitFileSystemView {
Map<String, HoodieBaseFile> newFilesWrittenForPartition =
filesWritten.stream()
.filter(file -> partitionStr.equals(file.getPartitionPath()))
.collect(Collectors.toMap(HoodieWriteStat::getFileId, writeStat ->
- new HoodieBaseFile(new Path(tableMetaClient.getBasePath(),
writeStat.getPath()).toString())));
+ new HoodieBaseFile(new CachingPath(tableMetaClient.getBasePath(),
writeStat.getPath()).toString())));
Stream<HoodieBaseFile> committedBaseFiles =
this.completedCommitsFileSystemView.getLatestBaseFiles(partitionStr);
Map<String, HoodieBaseFile> allFileIds = committedBaseFiles
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieBaseFile.java
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieBaseFile.java
new file mode 100644
index 00000000000..15a93cd232b
--- /dev/null
+++
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieBaseFile.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.common.util.Option;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestHoodieBaseFile {
+ private final String fileName =
"136281f3-c24e-423b-a65a-95dbfbddce1d_1-0-1_100.parquet";
+ private final String pathStr = "file:/tmp/hoodie/2021/01/01/" + fileName;
+ private final String fileId = "136281f3-c24e-423b-a65a-95dbfbddce1d";
+ private final String baseCommitTime = "100";
+ private final int length = 10;
+
+ @Test
+ void createFromHoodieBaseFile() {
+ FileStatus fileStatus = new FileStatus(length, false, 0, 0, 0, 0, null,
null, null, new Path(pathStr));
+ HoodieBaseFile hoodieBaseFile = new HoodieBaseFile(fileStatus);
+ assertFileGetters(fileStatus, new HoodieBaseFile(hoodieBaseFile), length,
Option.empty());
+ }
+
+ @Test
+ void createFromFileStatus() {
+ FileStatus fileStatus = new FileStatus(length, false, 0, 0, 0, 0, null,
null, null, new Path(pathStr));
+ HoodieBaseFile hoodieBaseFile = new HoodieBaseFile(fileStatus);
+ assertFileGetters(fileStatus, hoodieBaseFile, length, Option.empty());
+ }
+
+ @Test
+ void createFromFileStatusAndBootstrapBaseFile() {
+ HoodieBaseFile bootstrapBaseFile = new HoodieBaseFile(pathStr);
+ FileStatus fileStatus = new FileStatus(length, false, 0, 0, 0, 0, null,
null, null, new Path(pathStr));
+ HoodieBaseFile hoodieBaseFile = new HoodieBaseFile(fileStatus,
bootstrapBaseFile);
+ assertFileGetters(fileStatus, hoodieBaseFile, length,
Option.of(bootstrapBaseFile));
+ }
+
+ @Test
+ void createFromFilePath() {
+ HoodieBaseFile hoodieBaseFile = new HoodieBaseFile(pathStr);
+ assertFileGetters(null, hoodieBaseFile, -1, Option.empty());
+ }
+
+ @Test
+ void createFromFilePathAndBootstrapBaseFile() {
+ HoodieBaseFile bootstrapBaseFile = new HoodieBaseFile(pathStr);
+ HoodieBaseFile hoodieBaseFile = new HoodieBaseFile(pathStr,
bootstrapBaseFile);
+ assertFileGetters(null, hoodieBaseFile, -1, Option.of(bootstrapBaseFile));
+ }
+
+ private void assertFileGetters(FileStatus fileStatus, HoodieBaseFile
hoodieBaseFile, long fileLength, Option<HoodieBaseFile> bootstrapBaseFile) {
+ assertEquals(fileId, hoodieBaseFile.getFileId());
+ assertEquals(baseCommitTime, hoodieBaseFile.getCommitTime());
+ assertEquals(bootstrapBaseFile, hoodieBaseFile.getBootstrapBaseFile());
+ assertEquals(fileName, hoodieBaseFile.getFileName());
+ assertEquals(pathStr, hoodieBaseFile.getPath());
+ assertEquals(new Path(pathStr), hoodieBaseFile.getHadoopPath());
+ assertEquals(fileLength, hoodieBaseFile.getFileSize());
+ assertEquals(fileStatus, hoodieBaseFile.getFileStatus());
+ }
+}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieLogFile.java
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieLogFile.java
new file mode 100644
index 00000000000..1096d222ad9
--- /dev/null
+++
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieLogFile.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestHoodieLogFile {
+ private final String pathStr =
"file:///tmp/hoodie/2021/01/01/.136281f3-c24e-423b-a65a-95dbfbddce1d_100.log.2_1-0-1";
+ private final String fileId = "136281f3-c24e-423b-a65a-95dbfbddce1d";
+ private final String baseCommitTime = "100";
+ private final int logVersion = 2;
+ private final String writeToken = "1-0-1";
+ private final String fileExtension = "log";
+
+ private final int length = 10;
+
+ @Test
+ void createFromLogFile() {
+ FileStatus fileStatus = new FileStatus(length, false, 0, 0, 0, 0, null,
null, null, new Path(pathStr));
+ HoodieLogFile hoodieLogFile = new HoodieLogFile(fileStatus);
+ assertFileGetters(fileStatus, new HoodieLogFile(hoodieLogFile), length);
+ }
+
+ @Test
+ void createFromFileStatus() {
+ FileStatus fileStatus = new FileStatus(length, false, 0, 0, 0, 0, null,
null, null, new Path(pathStr));
+ HoodieLogFile hoodieLogFile = new HoodieLogFile(fileStatus);
+ assertFileGetters(fileStatus, hoodieLogFile, length);
+ }
+
+ @Test
+ void createFromPath() {
+ HoodieLogFile hoodieLogFile = new HoodieLogFile(new Path(pathStr));
+ assertFileGetters(null, hoodieLogFile, -1);
+ }
+
+ @Test
+ void createFromPathAndLength() {
+ HoodieLogFile hoodieLogFile = new HoodieLogFile(new Path(pathStr), length);
+ assertFileGetters(null, hoodieLogFile, length);
+ }
+
+ @Test
+ void createFromString() {
+ HoodieLogFile hoodieLogFile = new HoodieLogFile(pathStr);
+ assertFileGetters(null, hoodieLogFile, -1);
+ }
+
+ @Test
+ void createFromStringWithSuffix() {
+ String suffix = ".cdc";
+ String pathWithSuffix = pathStr + suffix;
+ HoodieLogFile hoodieLogFile = new HoodieLogFile(pathWithSuffix);
+ assertFileGetters(pathWithSuffix, null, hoodieLogFile, -1, suffix);
+ }
+
+ private void assertFileGetters(FileStatus fileStatus, HoodieLogFile
hoodieLogFile, long fileLength) {
+ assertFileGetters(pathStr, fileStatus, hoodieLogFile, fileLength, "");
+ }
+
+ private void assertFileGetters(String pathStr, FileStatus fileStatus,
HoodieLogFile hoodieLogFile, long fileLength, String suffix) {
+ assertEquals(fileId, hoodieLogFile.getFileId());
+ assertEquals(baseCommitTime, hoodieLogFile.getBaseCommitTime());
+ assertEquals(logVersion, hoodieLogFile.getLogVersion());
+ assertEquals(writeToken, hoodieLogFile.getLogWriteToken());
+ assertEquals(fileExtension, hoodieLogFile.getFileExtension());
+ assertEquals(new Path(pathStr), hoodieLogFile.getPath());
+ assertEquals(fileLength, hoodieLogFile.getFileSize());
+ assertEquals(fileStatus, hoodieLogFile.getFileStatus());
+ assertEquals(suffix, hoodieLogFile.getSuffix());
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
index d63696effba..894a6463a44 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
@@ -19,8 +19,8 @@
package org.apache.hudi.sink.partitioner.profile;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
-import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -70,16 +70,14 @@ public class DeltaWriteProfile extends WriteProfile {
for (FileSlice smallFileSlice : allSmallFileSlices) {
SmallFile sf = new SmallFile();
if (smallFileSlice.getBaseFile().isPresent()) {
- // TODO : Move logic of file name, file id, base commit time
handling inside file slice
- String filename = smallFileSlice.getBaseFile().get().getFileName();
- sf.location = new
HoodieRecordLocation(FSUtils.getCommitTime(filename),
FSUtils.getFileId(filename));
+ HoodieBaseFile baseFile = smallFileSlice.getBaseFile().get();
+ sf.location = new HoodieRecordLocation(baseFile.getCommitTime(),
baseFile.getFileId());
sf.sizeBytes = getTotalFileSize(smallFileSlice);
smallFileLocations.add(sf);
} else {
smallFileSlice.getLogFiles().findFirst().ifPresent(logFile -> {
// in case there is something error, and the file slice has no log
file
- sf.location = new
HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
- FSUtils.getFileIdFromLogPath(logFile.getPath()));
+ sf.location = new
HoodieRecordLocation(logFile.getBaseCommitTime(), logFile.getFileId());
sf.sizeBytes = getTotalFileSize(smallFileSlice);
smallFileLocations.add(sf);
});
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
index 9dd604a717f..1f41888ff45 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
@@ -20,7 +20,6 @@ package org.apache.hudi.sink.partitioner.profile;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.HoodieStorageConfig;
-import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecordLocation;
@@ -210,9 +209,8 @@ public class WriteProfile {
for (HoodieBaseFile file : allFiles) {
// filter out the corrupted files.
if (file.getFileSize() < config.getParquetSmallFileLimit() &&
file.getFileSize() > 0) {
- String filename = file.getFileName();
SmallFile sf = new SmallFile();
- sf.location = new
HoodieRecordLocation(FSUtils.getCommitTime(filename),
FSUtils.getFileId(filename));
+ sf.location = new HoodieRecordLocation(file.getCommitTime(),
file.getFileId());
sf.sizeBytes = file.getFileSize();
smallFileLocations.add(sf);
}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java
index 4b0b2d6ea79..043122fbdf8 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java
@@ -18,12 +18,14 @@
package org.apache.hudi.hadoop.realtime;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.InputSplitWithLocationInfo;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.hadoop.CachingPath;
import org.apache.hudi.hadoop.InputSplitUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.InputSplitWithLocationInfo;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@@ -126,7 +128,7 @@ public interface RealtimeSplit extends
InputSplitWithLocationInfo {
for (int i = 0; i < totalLogFiles; i++) {
String logFilePath = InputSplitUtils.readString(in);
long logFileSize = in.readLong();
- deltaLogPaths.add(new HoodieLogFile(new Path(logFilePath), logFileSize));
+ deltaLogPaths.add(new HoodieLogFile(new CachingPath(logFilePath),
logFileSize));
}
setDeltaLogFiles(deltaLogPaths);
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeFileSplit.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeFileSplit.java
index d2c4f1be61d..4b0f379aedb 100644
---
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeFileSplit.java
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeFileSplit.java
@@ -70,7 +70,8 @@ public class TestHoodieRealtimeFileSplit {
@BeforeEach
public void setUp(@TempDir java.nio.file.Path tempDir) throws Exception {
basePath = tempDir.toAbsolutePath().toString();
- deltaLogFiles = Collections.singletonList(new HoodieLogFile(new
Path(basePath + "/1.log"), 0L));
+ Path logPath = new Path(basePath + "/1.log");
+ deltaLogFiles = Collections.singletonList(new HoodieLogFile(logPath, 0L));
deltaLogPaths = Collections.singletonList(basePath + "/1.log");
fileSplitName = basePath + "/test.file";
baseFileSplit = new FileSplit(new Path(fileSplitName), 0, 100, new
String[] {});
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 16c0b011479..d79957c735f 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -875,7 +875,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
}
Schema readerSchema = converter.convert(messageType);
reader =
- HoodieLogFormat.newReader(fs, new HoodieLogFile(new
Path(logFilePathStr)), readerSchema);
+ HoodieLogFormat.newReader(fs, new HoodieLogFile(logFilePathStr),
readerSchema);
// read the avro blocks
if (reader.hasNext()) {
HoodieLogBlock block = reader.next();