This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 38409228614 [HUDI-6628] Rely on methods in HoodieBaseFile and 
HoodieLogFile instead of FSUtils when possible (#9337)
38409228614 is described below

commit 38409228614ead671423092a2243d466fcce2c35
Author: Tim Brown <[email protected]>
AuthorDate: Fri Aug 4 08:45:22 2023 -0700

    [HUDI-6628] Rely on methods in HoodieBaseFile and HoodieLogFile instead of 
FSUtils when possible (#9337)
---
 .../hudi/cli/commands/HoodieLogFileCommand.java    |   5 +-
 .../apache/hudi/client/CompactionAdminClient.java  |   7 +-
 .../apache/hudi/index/bloom/HoodieBloomIndex.java  |  17 ++--
 .../java/org/apache/hudi/io/HoodieMergeHandle.java |   4 +-
 .../java/org/apache/hudi/io/HoodieWriteHandle.java |   2 +-
 .../metadata/HoodieBackedTableMetadataWriter.java  |  24 ++---
 .../table/action/commit/JavaUpsertPartitioner.java |   3 +-
 .../index/bloom/HoodieFileProbingFunction.java     |  10 +-
 .../HoodieMetadataBloomFilterProbingFunction.java  |  17 ++--
 .../table/action/commit/UpsertPartitioner.java     |   3 +-
 .../SparkUpsertDeltaCommitPartitioner.java         |  12 +--
 .../java/org/apache/hudi/common/fs/FSUtils.java    |  17 +---
 .../org/apache/hudi/common/model/BaseFile.java     |   2 +-
 .../hudi/common/model/CompactionOperation.java     |   4 +-
 .../apache/hudi/common/model/HoodieBaseFile.java   |  46 +++++++++-
 .../apache/hudi/common/model/HoodieLogFile.java    | 101 ++++++++++++++++-----
 .../table/log/AbstractHoodieLogRecordReader.java   |   6 +-
 .../hudi/common/table/log/HoodieLogFileReader.java |   4 +-
 .../view/HoodieTablePreCommitFileSystemView.java   |   5 +-
 .../hudi/common/model/TestHoodieBaseFile.java      |  81 +++++++++++++++++
 .../hudi/common/model/TestHoodieLogFile.java       |  92 +++++++++++++++++++
 .../partitioner/profile/DeltaWriteProfile.java     |  10 +-
 .../sink/partitioner/profile/WriteProfile.java     |   4 +-
 .../apache/hudi/hadoop/realtime/RealtimeSplit.java |   8 +-
 .../realtime/TestHoodieRealtimeFileSplit.java      |   3 +-
 .../utilities/HoodieMetadataTableValidator.java    |   2 +-
 26 files changed, 375 insertions(+), 114 deletions(-)

diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
index 82f9c1a6468..cf36a704c7d 100644
--- 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
+++ 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
@@ -42,6 +42,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieMemoryConfig;
+import org.apache.hudi.hadoop.CachingPath;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.avro.Schema;
@@ -232,9 +233,9 @@ public class HoodieLogFileCommand {
     } else {
       for (String logFile : logFilePaths) {
         Schema writerSchema = new AvroSchemaConverter()
-            
.convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(client.getFs(),
 new Path(logFile))));
+            
.convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(client.getFs(),
 new CachingPath(logFile))));
         HoodieLogFormat.Reader reader =
-            HoodieLogFormat.newReader(fs, new HoodieLogFile(new 
Path(logFile)), writerSchema);
+            HoodieLogFormat.newReader(fs, new HoodieLogFile(new 
CachingPath(logFile)), writerSchema);
         // read the avro blocks
         while (reader.hasNext()) {
           HoodieLogBlock n = reader.next();
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java
index 8ff562c2070..257d2cd855c 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java
@@ -41,6 +41,7 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.hadoop.CachingPath;
 import org.apache.hudi.table.action.compact.OperationResult;
 
 import org.apache.hadoop.fs.FileStatus;
@@ -244,8 +245,8 @@ public class CompactionAdminClient extends BaseHoodieClient 
{
         merged.getLogFiles().filter(lf -> lf.getLogVersion() > 
maxVersion).collect(Collectors.toList());
     return logFilesToBeMoved.stream().map(lf -> {
       ValidationUtils.checkArgument(lf.getLogVersion() - maxVersion > 0, 
"Expect new log version to be sane");
-      HoodieLogFile newLogFile = new HoodieLogFile(new 
Path(lf.getPath().getParent(),
-          FSUtils.makeLogFileName(lf.getFileId(), "." + 
FSUtils.getFileExtensionFromLog(lf.getPath()),
+      HoodieLogFile newLogFile = new HoodieLogFile(new 
CachingPath(lf.getPath().getParent(),
+          FSUtils.makeLogFileName(lf.getFileId(), "." + lf.getFileExtension(),
               compactionInstant, lf.getLogVersion() - maxVersion, 
HoodieLogFormat.UNKNOWN_WRITE_TOKEN)));
       return Pair.of(lf, newLogFile);
     }).collect(Collectors.toList());
@@ -450,7 +451,7 @@ public class CompactionAdminClient extends BaseHoodieClient 
{
         .orElse(fileSliceForCompaction.getLogFiles().findFirst().map(lf -> 
lf.getPath().getParent().toString()).get());
     for (HoodieLogFile toRepair : logFilesToRepair) {
       int version = maxUsedVersion + 1;
-      HoodieLogFile newLf = new HoodieLogFile(new Path(parentPath, 
FSUtils.makeLogFileName(operation.getFileId(),
+      HoodieLogFile newLf = new HoodieLogFile(new CachingPath(parentPath, 
FSUtils.makeLogFileName(operation.getFileId(),
           logExtn, operation.getBaseInstantTime(), version, 
HoodieLogFormat.UNKNOWN_WRITE_TOKEN)));
       result.add(Pair.of(toRepair, newLf));
       maxUsedVersion = version;
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
index fcd7135833f..ab7ccd1b49b 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
@@ -25,7 +25,6 @@ import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.data.HoodiePairData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieKey;
@@ -47,6 +46,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Stream;
@@ -214,12 +214,15 @@ public class HoodieBloomIndex extends HoodieIndex<Object, 
Object> {
 
     String keyField = 
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
 
+    List<Pair<String, HoodieBaseFile>> baseFilesForAllPartitions = 
HoodieIndexUtils.getLatestBaseFilesForAllPartitions(partitions, context, 
hoodieTable);
     // Partition and file name pairs
-    List<Pair<String, String>> partitionFileNameList =
-        HoodieIndexUtils.getLatestBaseFilesForAllPartitions(partitions, 
context, hoodieTable).stream()
-            .map(partitionBaseFilePair -> 
Pair.of(partitionBaseFilePair.getLeft(), 
partitionBaseFilePair.getRight().getFileName()))
-            .sorted()
-            .collect(toList());
+    List<Pair<String, String>> partitionFileNameList = new 
ArrayList<>(baseFilesForAllPartitions.size());
+    Map<Pair<String, String>, String> partitionAndFileNameToFileId = new 
HashMap<>(baseFilesForAllPartitions.size(), 1);
+    baseFilesForAllPartitions.forEach(pair -> {
+      Pair<String, String> partitionAndFileName = Pair.of(pair.getKey(), 
pair.getValue().getFileName());
+      partitionFileNameList.add(partitionAndFileName);
+      partitionAndFileNameToFileId.put(partitionAndFileName, 
pair.getValue().getFileId());
+    });
 
     if (partitionFileNameList.isEmpty()) {
       return Collections.emptyList();
@@ -233,7 +236,7 @@ public class HoodieBloomIndex extends HoodieIndex<Object, 
Object> {
     for (Map.Entry<Pair<String, String>, HoodieMetadataColumnStats> entry : 
fileToColumnStatsMap.entrySet()) {
       result.add(Pair.of(entry.getKey().getLeft(),
           new BloomIndexFileInfo(
-              FSUtils.getFileId(entry.getKey().getRight()),
+              partitionAndFileNameToFileId.get(entry.getKey()),
               // NOTE: Here we assume that the type of the primary key field 
is string
               (String) unwrapAvroValueWrapper(entry.getValue().getMinValue()),
               (String) unwrapAvroValueWrapper(entry.getValue().getMaxValue())
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 7aa357b7504..21c0059474e 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -173,7 +173,7 @@ public class HoodieMergeHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O>
     writeStatus.setStat(new HoodieWriteStat());
     try {
       String latestValidFilePath = baseFileToMerge.getFileName();
-      
writeStatus.getStat().setPrevCommit(FSUtils.getCommitTime(latestValidFilePath));
+      writeStatus.getStat().setPrevCommit(baseFileToMerge.getCommitTime());
 
       HoodiePartitionMetadata partitionMetadata = new 
HoodiePartitionMetadata(fs, instantTime,
           new Path(config.getBasePath()), 
FSUtils.getPartitionPath(config.getBasePath(), partitionPath),
@@ -471,7 +471,7 @@ public class HoodieMergeHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O>
           String.format("Record write count decreased for file: %s, Partition 
Path: %s (%s:%d + %d < %s:%d)",
               writeStatus.getFileId(), writeStatus.getPartitionPath(),
               instantTime, writeStatus.getStat().getNumWrites(), 
writeStatus.getStat().getNumDeletes(),
-              FSUtils.getCommitTime(oldFilePath.toString()), oldNumWrites));
+              baseFileToMerge.getCommitTime(), oldNumWrites));
     }
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index 9e716a280e8..81480767599 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -253,7 +253,7 @@ public abstract class HoodieWriteHandle<T, I, K, O> extends 
HoodieIOHandle<T, I,
         .withSizeThreshold(config.getLogFileMaxSize())
         .withFs(fs)
         .withRolloverLogWriteToken(writeToken)
-        .withLogWriteToken(latestLogFile.map(x -> 
FSUtils.getWriteTokenFromLogPath(x.getPath())).orElse(writeToken))
+        
.withLogWriteToken(latestLogFile.map(HoodieLogFile::getLogWriteToken).orElse(writeToken))
         .withSuffix(suffix)
         .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 07aea4d934f..53ad933197e 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -35,6 +35,7 @@ import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.function.SerializableFunction;
 import org.apache.hudi.common.model.DeleteRecord;
 import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieFileFormat;
@@ -164,7 +165,7 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
     this.engineContext = engineContext;
     this.hadoopConf = new SerializableConfiguration(hadoopConf);
     this.metrics = Option.empty();
-    this.enabledPartitionTypes = new ArrayList<>();
+    this.enabledPartitionTypes = new ArrayList<>(4);
 
     this.dataMetaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(dataWriteConfig.getBasePath()).build();
 
@@ -481,10 +482,10 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
     // Collect the list of latest base files present in each partition
     List<String> partitions = metadata.getAllPartitionPaths();
     fsView.loadAllPartitions();
-    final List<Pair<String, String>> partitionBaseFilePairs = new 
ArrayList<>();
+    final List<Pair<String, HoodieBaseFile>> partitionBaseFilePairs = new 
ArrayList<>();
     for (String partition : partitions) {
       partitionBaseFilePairs.addAll(fsView.getLatestBaseFiles(partition)
-          .map(basefile -> Pair.of(partition, 
basefile.getFileName())).collect(Collectors.toList()));
+          .map(basefile -> Pair.of(partition, 
basefile)).collect(Collectors.toList()));
     }
 
     LOG.info("Initializing record index from " + partitionBaseFilePairs.size() 
+ " base files in "
@@ -509,7 +510,7 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
    * Read the record keys from base files in partitions and return records.
    */
   private HoodieData<HoodieRecord> 
readRecordKeysFromBaseFiles(HoodieEngineContext engineContext,
-                                                               
List<Pair<String, String>> partitionBaseFilePairs,
+                                                               
List<Pair<String, HoodieBaseFile>> partitionBaseFilePairs,
                                                                boolean 
forDelete) {
     if (partitionBaseFilePairs.isEmpty()) {
       return engineContext.emptyHoodieData();
@@ -517,13 +518,14 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
 
     engineContext.setJobStatus(this.getClass().getSimpleName(), "Record Index: 
reading record keys from " + partitionBaseFilePairs.size() + " base files");
     final int parallelism = Math.min(partitionBaseFilePairs.size(), 
dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
-    return engineContext.parallelize(partitionBaseFilePairs, 
parallelism).flatMap(p -> {
-      final String partition = p.getKey();
-      final String filename = p.getValue();
+    return engineContext.parallelize(partitionBaseFilePairs, 
parallelism).flatMap(partitionAndBaseFile -> {
+      final String partition = partitionAndBaseFile.getKey();
+      final HoodieBaseFile baseFile = partitionAndBaseFile.getValue();
+      final String filename = baseFile.getFileName();
       Path dataFilePath = new Path(dataWriteConfig.getBasePath(), partition + 
Path.SEPARATOR + filename);
 
-      final String fileId = FSUtils.getFileId(filename);
-      final String instantTime = FSUtils.getCommitTime(filename);
+      final String fileId = baseFile.getFileId();
+      final String instantTime = baseFile.getCommitTime();
       HoodieFileReader reader = 
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(hadoopConf.get(),
 dataFilePath);
       ClosableIterator<String> recordKeyIterator = 
reader.getRecordKeyIterator();
 
@@ -1370,10 +1372,10 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
   private HoodieData<HoodieRecord> 
getRecordIndexReplacedRecords(HoodieReplaceCommitMetadata 
replaceCommitMetadata) {
     final HoodieMetadataFileSystemView fsView = new 
HoodieMetadataFileSystemView(dataMetaClient,
         dataMetaClient.getActiveTimeline(), metadata);
-    List<Pair<String, String>> partitionBaseFilePairs = replaceCommitMetadata
+    List<Pair<String, HoodieBaseFile>> partitionBaseFilePairs = 
replaceCommitMetadata
         .getPartitionToReplaceFileIds()
         .keySet().stream().flatMap(partition
-            -> fsView.getLatestBaseFiles(partition).map(f -> 
Pair.of(partition, f.getFileName())))
+            -> fsView.getLatestBaseFiles(partition).map(f -> 
Pair.of(partition, f)))
         .collect(Collectors.toList());
 
     return readRecordKeysFromBaseFiles(engineContext, partitionBaseFilePairs, 
true);
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java
index 1c8ba1b175d..ad7111d70a2 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java
@@ -253,9 +253,8 @@ public class JavaUpsertPartitioner<T> implements 
Partitioner  {
 
       for (HoodieBaseFile file : allFiles) {
         if (file.getFileSize() < config.getParquetSmallFileLimit()) {
-          String filename = file.getFileName();
           SmallFile sf = new SmallFile();
-          sf.location = new 
HoodieRecordLocation(FSUtils.getCommitTime(filename), 
FSUtils.getFileId(filename));
+          sf.location = new HoodieRecordLocation(file.getCommitTime(), 
file.getFileId());
           sf.sizeBytes = file.getFileSize();
           smallFileLocations.add(sf);
         }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java
index 43d3a8a8e2e..2b6a96b3d05 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java
@@ -20,7 +20,6 @@ package org.apache.hudi.index.bloom;
 
 import org.apache.hudi.client.utils.LazyIterableIterator;
 import org.apache.hudi.common.config.SerializableConfiguration;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
@@ -84,7 +83,7 @@ public class HoodieFileProbingFunction implements
     @Override
     protected List<HoodieKeyLookupResult> computeNext() {
       // Partition path and file name pair to list of keys
-      final Map<Pair<String, String>, HoodieBloomFilterProbingResult> 
fileToLookupResults = new HashMap<>();
+      final Map<Pair<String, HoodieBaseFile>, HoodieBloomFilterProbingResult> 
fileToLookupResults = new HashMap<>();
       final Map<String, HoodieBaseFile> fileIDBaseFileMap = new HashMap<>();
 
       while (inputItr.hasNext()) {
@@ -103,7 +102,7 @@ public class HoodieFileProbingFunction implements
           fileIDBaseFileMap.put(fileId, baseFile.get());
         }
 
-        fileToLookupResults.putIfAbsent(Pair.of(partitionPath, 
fileIDBaseFileMap.get(fileId).getFileName()), entry._2);
+        fileToLookupResults.putIfAbsent(Pair.of(partitionPath, 
fileIDBaseFileMap.get(fileId)), entry._2);
 
         if (fileToLookupResults.size() > 
BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH) {
           break;
@@ -116,12 +115,11 @@ public class HoodieFileProbingFunction implements
 
       return fileToLookupResults.entrySet().stream()
           .map(entry -> {
-            Pair<String, String> partitionPathFileNamePair = entry.getKey();
+            Pair<String, HoodieBaseFile> partitionPathFileNamePair = 
entry.getKey();
             HoodieBloomFilterProbingResult bloomFilterKeyLookupResult = 
entry.getValue();
 
             final String partitionPath = partitionPathFileNamePair.getLeft();
-            final String fileName = partitionPathFileNamePair.getRight();
-            final String fileId = FSUtils.getFileId(fileName);
+            final String fileId = 
partitionPathFileNamePair.getRight().getFileId();
             ValidationUtils.checkState(!fileId.isEmpty());
 
             List<String> candidateRecordKeys = 
bloomFilterKeyLookupResult.getCandidateKeys();
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomFilterProbingFunction.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomFilterProbingFunction.java
index 5dacad7320e..c96bd8740fe 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomFilterProbingFunction.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomFilterProbingFunction.java
@@ -20,7 +20,6 @@ package org.apache.hudi.index.bloom;
 
 import org.apache.hudi.client.utils.LazyIterableIterator;
 import org.apache.hudi.common.bloom.BloomFilter;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieKey;
@@ -44,6 +43,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import scala.Tuple2;
 
@@ -93,7 +93,7 @@ public class HoodieMetadataBloomFilterProbingFunction 
implements
     @Override
     protected Iterator<Tuple2<HoodieFileGroupId, 
HoodieBloomFilterProbingResult>> computeNext() {
       // Partition path and file name pair to list of keys
-      final Map<Pair<String, String>, List<HoodieKey>> fileToKeysMap = new 
HashMap<>();
+      final Map<Pair<String, HoodieBaseFile>, List<HoodieKey>> fileToKeysMap = 
new HashMap<>();
       final Map<String, HoodieBaseFile> fileIDBaseFileMap = new HashMap<>();
 
       while (inputItr.hasNext()) {
@@ -110,7 +110,7 @@ public class HoodieMetadataBloomFilterProbingFunction 
implements
           fileIDBaseFileMap.put(fileId, baseFile.get());
         }
 
-        fileToKeysMap.computeIfAbsent(Pair.of(partitionPath, 
fileIDBaseFileMap.get(fileId).getFileName()),
+        fileToKeysMap.computeIfAbsent(Pair.of(partitionPath, 
fileIDBaseFileMap.get(fileId)),
             k -> new ArrayList<>()).add(new HoodieKey(entry._2, 
partitionPath));
 
         if (fileToKeysMap.size() > 
BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH) {
@@ -122,20 +122,19 @@ public class HoodieMetadataBloomFilterProbingFunction 
implements
         return Collections.emptyIterator();
       }
 
-      List<Pair<String, String>> partitionNameFileNameList = new 
ArrayList<>(fileToKeysMap.keySet());
+      List<Pair<String, String>> partitionNameFileNameList = 
fileToKeysMap.keySet().stream().map(pair -> Pair.of(pair.getLeft(), 
pair.getRight().getFileName())).collect(Collectors.toList());
       Map<Pair<String, String>, BloomFilter> fileToBloomFilterMap =
           
hoodieTable.getMetadataTable().getBloomFilters(partitionNameFileNameList);
 
       return fileToKeysMap.entrySet().stream()
           .map(entry -> {
-            Pair<String, String> partitionPathFileNamePair = entry.getKey();
             List<HoodieKey> hoodieKeyList = entry.getValue();
-
-            final String partitionPath = partitionPathFileNamePair.getLeft();
-            final String fileName = partitionPathFileNamePair.getRight();
-            final String fileId = FSUtils.getFileId(fileName);
+            final String partitionPath = entry.getKey().getLeft();
+            final HoodieBaseFile baseFile = entry.getKey().getRight();
+            final String fileId = baseFile.getFileId();
             ValidationUtils.checkState(!fileId.isEmpty());
 
+            Pair<String, String> partitionPathFileNamePair = 
Pair.of(partitionPath, baseFile.getFileName());
             if (!fileToBloomFilterMap.containsKey(partitionPathFileNamePair)) {
               throw new HoodieIndexException("Failed to get the bloom filter 
for " + partitionPathFileNamePair);
             }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
index d98524fc38f..edd6d981d18 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
@@ -302,9 +302,8 @@ public class UpsertPartitioner<T> extends 
SparkHoodiePartitioner<T> {
 
       for (HoodieBaseFile file : allFiles) {
         if (file.getFileSize() < config.getParquetSmallFileLimit()) {
-          String filename = file.getFileName();
           SmallFile sf = new SmallFile();
-          sf.location = new 
HoodieRecordLocation(FSUtils.getCommitTime(filename), 
FSUtils.getFileId(filename));
+          sf.location = new HoodieRecordLocation(file.getCommitTime(), 
file.getFileId());
           sf.sizeBytes = file.getFileSize();
           smallFileLocations.add(sf);
         }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java
index a5dd9978939..dc8f267718f 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java
@@ -19,8 +19,8 @@
 package org.apache.hudi.table.action.deltacommit;
 
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -32,6 +32,7 @@ import org.apache.hudi.table.action.commit.SmallFile;
 import org.apache.hudi.table.action.commit.UpsertPartitioner;
 
 import javax.annotation.Nonnull;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -69,15 +70,14 @@ public class SparkUpsertDeltaCommitPartitioner<T> extends 
UpsertPartitioner<T> {
     for (FileSlice smallFileSlice : smallFileSlicesCandidates) {
       SmallFile sf = new SmallFile();
       if (smallFileSlice.getBaseFile().isPresent()) {
-        // TODO : Move logic of file name, file id, base commit time handling 
inside file slice
-        String filename = smallFileSlice.getBaseFile().get().getFileName();
-        sf.location = new 
HoodieRecordLocation(FSUtils.getCommitTime(filename), 
FSUtils.getFileId(filename));
+        HoodieBaseFile baseFile = smallFileSlice.getBaseFile().get();
+        sf.location = new HoodieRecordLocation(baseFile.getCommitTime(), 
baseFile.getFileId());
         sf.sizeBytes = getTotalFileSize(smallFileSlice);
         smallFileLocations.add(sf);
       } else {
         HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get();
-        sf.location = new 
HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
-            FSUtils.getFileIdFromLogPath(logFile.getPath()));
+        sf.location = new HoodieRecordLocation(logFile.getBaseCommitTime(),
+            logFile.getFileId());
         sf.sizeBytes = getTotalFileSize(smallFileSlice);
         smallFileLocations.add(sf);
       }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index 234af8a7be3..455f80af497 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -193,9 +193,9 @@ public class FSUtils {
   public static String getCommitTime(String fullFileName) {
     try {
       if (isLogFile(fullFileName)) {
-        return fullFileName.split("_")[1].split("\\.")[0];
+        return fullFileName.split("_")[1].split("\\.", 2)[0];
       }
-      return fullFileName.split("_")[2].split("\\.")[0];
+      return fullFileName.split("_")[2].split("\\.", 2)[0];
     } catch (ArrayIndexOutOfBoundsException e) {
       throw new HoodieException("Failed to get commit time from filename: " + 
fullFileName, e);
     }
@@ -206,7 +206,7 @@ public class FSUtils {
   }
 
   public static String getFileId(String fullFileName) {
-    return fullFileName.split("_")[0];
+    return fullFileName.split("_", 2)[0];
   }
 
   /**
@@ -454,15 +454,6 @@ public class FSUtils {
     return Integer.parseInt(matcher.group(4));
   }
 
-  public static String getSuffixFromLogPath(Path path) {
-    Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
-    if (!matcher.find()) {
-      throw new InvalidHoodiePathException(path, "LogFile");
-    }
-    String val = matcher.group(10);
-    return val == null ? "" : val;
-  }
-
   public static String makeLogFileName(String fileId, String logFileExtension, 
String baseCommitTime, int version,
       String writeToken) {
     String suffix = (writeToken == null)
@@ -547,7 +538,7 @@ public class FSUtils {
         getLatestLogFile(getAllLogFiles(fs, partitionPath, fileId, 
logFileExtension, baseCommitTime));
     if (latestLogFile.isPresent()) {
       return Option
-          .of(Pair.of(latestLogFile.get().getLogVersion(), 
getWriteTokenFromLogPath(latestLogFile.get().getPath())));
+          .of(Pair.of(latestLogFile.get().getLogVersion(), 
latestLogFile.get().getLogWriteToken()));
     }
     return Option.empty();
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseFile.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseFile.java
index 7c4c8977795..cfca6c50c75 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseFile.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseFile.java
@@ -36,7 +36,7 @@ public class BaseFile implements Serializable {
 
   private transient FileStatus fileStatus;
   private final String fullPath;
-  private final String fileName;
+  protected final String fileName;
   private long fileLen;
 
   public BaseFile(BaseFile dataFile) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/CompactionOperation.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/CompactionOperation.java
index 14f8f59b3da..861271b0630 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/CompactionOperation.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/CompactionOperation.java
@@ -73,8 +73,8 @@ public class CompactionOperation implements Serializable {
     } else {
       assert logFiles.size() > 0;
       this.dataFileName = Option.empty();
-      this.baseInstantTime = 
FSUtils.getBaseCommitTimeFromLogPath(logFiles.get(0).getPath());
-      this.id = new HoodieFileGroupId(partitionPath, 
FSUtils.getFileIdFromLogPath(logFiles.get(0).getPath()));
+      this.baseInstantTime = logFiles.get(0).getBaseCommitTime();
+      this.id = new HoodieFileGroupId(partitionPath, 
logFiles.get(0).getFileId());
       this.dataFileCommitTime = Option.empty();
       this.bootstrapFilePath = Option.empty();
     }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieBaseFile.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieBaseFile.java
index 2c640bb1432..ed1c32698eb 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieBaseFile.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieBaseFile.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.common.model;
 
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.util.Option;
 
 import org.apache.hadoop.fs.FileStatus;
@@ -28,12 +27,19 @@ import org.apache.hadoop.fs.FileStatus;
  * Supports APIs to get Hudi FileId, Commit Time and bootstrap file (if any).
  */
 public class HoodieBaseFile extends BaseFile {
+  private static final long serialVersionUID = 1L;
+  private static final char UNDERSCORE = '_';
+  private static final char DOT = '.';
+  private final String fileId;
+  private final String commitTime;
 
   private Option<BaseFile> bootstrapBaseFile;
 
   public HoodieBaseFile(HoodieBaseFile dataFile) {
     super(dataFile);
     this.bootstrapBaseFile = dataFile.bootstrapBaseFile;
+    this.fileId = dataFile.getFileId();
+    this.commitTime = dataFile.getCommitTime();
   }
 
   public HoodieBaseFile(FileStatus fileStatus) {
@@ -43,6 +49,9 @@ public class HoodieBaseFile extends BaseFile {
   public HoodieBaseFile(FileStatus fileStatus, BaseFile bootstrapBaseFile) {
     super(fileStatus);
     this.bootstrapBaseFile = Option.ofNullable(bootstrapBaseFile);
+    String[] fileIdAndCommitTime = getFileIdAndCommitTimeFromFileName();
+    this.fileId = fileIdAndCommitTime[0];
+    this.commitTime = fileIdAndCommitTime[1];
   }
 
   public HoodieBaseFile(String filePath) {
@@ -52,14 +61,45 @@ public class HoodieBaseFile extends BaseFile {
   public HoodieBaseFile(String filePath, BaseFile bootstrapBaseFile) {
     super(filePath);
     this.bootstrapBaseFile = Option.ofNullable(bootstrapBaseFile);
+    String[] fileIdAndCommitTime = getFileIdAndCommitTimeFromFileName();
+    this.fileId = fileIdAndCommitTime[0];
+    this.commitTime = fileIdAndCommitTime[1];
+  }
+
+  /**
+   * Parses the file ID and commit time from the fileName.
+   * @return String array of size 2 with fileId as the first and commitTime as 
the second element.
+   */
+  private String[] getFileIdAndCommitTimeFromFileName() {
+    String[] values = new String[2];
+    short underscoreCount = 0;
+    short lastUnderscoreIndex = 0;
+    for (int i = 0; i < fileName.length(); i++) {
+      char c = fileName.charAt(i);
+      if (c == UNDERSCORE) {
+        if (underscoreCount == 0) {
+          values[0] = fileName.substring(0, i);
+        }
+        lastUnderscoreIndex = (short) i;
+        underscoreCount++;
+      } else if (c == DOT) {
+        if (underscoreCount == 2) {
+          values[1] = fileName.substring(lastUnderscoreIndex + 1, i);
+          return values;
+        }
+      }
+    }
+    // case where there is no '.' in file name (no file suffix like .parquet)
+    values[1] = fileName.substring(lastUnderscoreIndex + 1);
+    return values;
   }
 
   public String getFileId() {
-    return FSUtils.getFileId(getFileName());
+    return fileId;
   }
 
   public String getCommitTime() {
-    return FSUtils.getCommitTime(getFileName());
+    return commitTime;
   }
 
   public Option<BaseFile> getBootstrapBaseFile() {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java
index 988194964f7..ecfbd925dd1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java
@@ -19,6 +19,8 @@
 package org.apache.hudi.common.model;
 
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.exception.InvalidHoodiePathException;
+import org.apache.hudi.hadoop.CachingPath;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -28,6 +30,9 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.Comparator;
 import java.util.Objects;
+import java.util.regex.Matcher;
+
+import static org.apache.hudi.common.fs.FSUtils.LOG_FILE_PATTERN;
 
 /**
  * Abstracts a single log file. Contains methods to extract metadata like the 
fileId, version and extension from the log
@@ -46,65 +51,115 @@ public class HoodieLogFile implements Serializable {
   private static final Comparator<HoodieLogFile> LOG_FILE_COMPARATOR_REVERSED 
= new LogFileComparator().reversed();
 
   private transient FileStatus fileStatus;
+  private transient Path path;
   private final String pathStr;
+  private String fileId;
+  private String baseCommitTime;
+  private int logVersion;
+  private String logWriteToken;
+  private String fileExtension;
+  private String suffix;
   private long fileLen;
 
   public HoodieLogFile(HoodieLogFile logFile) {
-    this.fileStatus = logFile.fileStatus;
+    this.fileStatus = logFile.getFileStatus();
+    this.path = logFile.getPath();
     this.pathStr = logFile.pathStr;
-    this.fileLen = logFile.fileLen;
+    this.fileId = logFile.getFileId();
+    this.baseCommitTime = logFile.getBaseCommitTime();
+    this.logVersion = logFile.getLogVersion();
+    this.logWriteToken = logFile.getLogWriteToken();
+    this.fileExtension = logFile.getFileExtension();
+    this.suffix = logFile.getSuffix();
+    this.fileLen = logFile.getFileSize();
   }
 
   public HoodieLogFile(FileStatus fileStatus) {
-    this.fileStatus = fileStatus;
-    this.pathStr = fileStatus.getPath().toString();
-    this.fileLen = fileStatus.getLen();
+    this(fileStatus, fileStatus.getPath(), fileStatus.getPath().toString(), 
fileStatus.getLen());
   }
 
   public HoodieLogFile(Path logPath) {
-    this.fileStatus = null;
-    this.pathStr = logPath.toString();
-    this.fileLen = -1;
+    this(null, logPath, logPath.toString(), -1);
   }
 
-  public HoodieLogFile(Path logPath, Long fileLen) {
-    this.fileStatus = null;
-    this.pathStr = logPath.toString();
-    this.fileLen = fileLen;
+  public HoodieLogFile(Path logPath, long fileLen) {
+    this(null, logPath, logPath.toString(), fileLen);
   }
 
   public HoodieLogFile(String logPathStr) {
-    this.fileStatus = null;
+    this(null, null, logPathStr, -1);
+  }
+
+  private HoodieLogFile(FileStatus fileStatus, Path logPath, String 
logPathStr, long fileLen) {
+    this.fileStatus = fileStatus;
     this.pathStr = logPathStr;
-    this.fileLen = -1;
+    this.fileLen = fileLen;
+    this.logVersion = -1; // mark version as uninitialized
+    if (logPath instanceof CachingPath) {
+      this.path = logPath;
+    }
+  }
+
+  private void parseFieldsFromPath() {
+    Matcher matcher = LOG_FILE_PATTERN.matcher(getPath().getName());
+    if (!matcher.find()) {
+      throw new InvalidHoodiePathException(path, "LogFile");
+    }
+    this.fileId = matcher.group(1);
+    this.baseCommitTime = matcher.group(2);
+    this.fileExtension = matcher.group(3);
+    this.logVersion = Integer.parseInt(matcher.group(4));
+    this.logWriteToken = matcher.group(6);
+    this.suffix = matcher.group(10) == null ? "" : matcher.group(10);
   }
 
   public String getFileId() {
-    return FSUtils.getFileIdFromLogPath(getPath());
+    if (fileId == null) {
+      parseFieldsFromPath();
+    }
+    return fileId;
   }
 
   public String getBaseCommitTime() {
-    return FSUtils.getBaseCommitTimeFromLogPath(getPath());
+    if (baseCommitTime == null) {
+      parseFieldsFromPath();
+    }
+    return baseCommitTime;
   }
 
   public int getLogVersion() {
-    return FSUtils.getFileVersionFromLog(getPath());
+    if (logVersion == -1) {
+      parseFieldsFromPath();
+    }
+    return logVersion;
   }
 
   public String getLogWriteToken() {
-    return FSUtils.getWriteTokenFromLogPath(getPath());
+    if (logWriteToken == null) {
+      parseFieldsFromPath();
+    }
+    return logWriteToken;
   }
 
   public String getFileExtension() {
-    return FSUtils.getFileExtensionFromLog(getPath());
+    if (fileExtension == null) {
+      parseFieldsFromPath();
+    }
+    return fileExtension;
   }
 
   public String getSuffix() {
-    return FSUtils.getSuffixFromLogPath(getPath());
+    if (suffix == null) {
+      parseFieldsFromPath();
+    }
+    return suffix;
   }
 
   public Path getPath() {
-    return new Path(pathStr);
+    if (path == null) {
+      path = new CachingPath(pathStr);
+    }
+    return path;
   }
 
   public String getFileName() {
@@ -131,9 +186,9 @@ public class HoodieLogFile implements Serializable {
     String fileId = getFileId();
     String baseCommitTime = getBaseCommitTime();
     Path path = getPath();
-    String extension = "." + FSUtils.getFileExtensionFromLog(path);
+    String extension = "." + fileExtension;
     int newVersion = FSUtils.computeNextLogVersion(fs, path.getParent(), 
fileId, extension, baseCommitTime);
-    return new HoodieLogFile(new Path(path.getParent(),
+    return new HoodieLogFile(new CachingPath(path.getParent(),
         FSUtils.makeLogFileName(fileId, extension, baseCommitTime, newVersion, 
logWriteToken)));
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index 6ef1a6f5542..7b1e737610b 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -39,13 +39,13 @@ import 
org.apache.hudi.common.util.collection.CloseableMappingIterator;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.hadoop.CachingPath;
 import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.internal.schema.action.InternalSchemaMerger;
 import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
 
 import org.apache.avro.Schema;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -237,7 +237,7 @@ public abstract class AbstractHoodieLogRecordReader {
     try {
       // Iterate over the paths
       logFormatReaderWrapper = new HoodieLogFormatReader(fs,
-          logFilePaths.stream().map(logFile -> new HoodieLogFile(new 
Path(logFile))).collect(Collectors.toList()),
+          logFilePaths.stream().map(logFile -> new HoodieLogFile(new 
CachingPath(logFile))).collect(Collectors.toList()),
           readerSchema, readBlocksLazily, reverseReader, bufferSize, 
shouldLookupRecords(), recordKeyField, internalSchema);
 
       Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
@@ -396,7 +396,7 @@ public abstract class AbstractHoodieLogRecordReader {
     try {
       // Iterate over the paths
       logFormatReaderWrapper = new HoodieLogFormatReader(fs,
-          logFilePaths.stream().map(logFile -> new HoodieLogFile(new 
Path(logFile))).collect(Collectors.toList()),
+          logFilePaths.stream().map(logFile -> new HoodieLogFile(new 
CachingPath(logFile))).collect(Collectors.toList()),
           readerSchema, readBlocksLazily, reverseReader, bufferSize, 
shouldLookupRecords(), recordKeyField, internalSchema);
 
       /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
index f268accb706..6759650af78 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.fs.BufferedFSInputStream;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -114,7 +115,8 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
     // NOTE: We repackage {@code HoodieLogFile} here to make sure that the 
provided path
     //       is prefixed with an appropriate scheme given that we're not 
propagating the FS
     //       further
-    this.logFile = new HoodieLogFile(FSUtils.makeQualified(fs, 
logFile.getPath()), logFile.getFileSize());
+    Path updatedPath = FSUtils.makeQualified(fs, logFile.getPath());
+    this.logFile = updatedPath.equals(logFile.getPath()) ? logFile : new 
HoodieLogFile(updatedPath, logFile.getFileSize());
     this.inputStream = getFSDataInputStream(fs, this.logFile, bufferSize);
     this.readerSchema = readerSchema;
     this.readBlockLazily = readBlockLazily;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java
index b50b74e65df..1ff93327f7f 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java
@@ -21,8 +21,7 @@ package org.apache.hudi.common.table.view;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-
-import org.apache.hadoop.fs.Path;
+import org.apache.hudi.hadoop.CachingPath;
 
 import java.util.Collections;
 import java.util.List;
@@ -72,7 +71,7 @@ public class HoodieTablePreCommitFileSystemView {
     Map<String, HoodieBaseFile> newFilesWrittenForPartition = 
filesWritten.stream()
         .filter(file -> partitionStr.equals(file.getPartitionPath()))
         .collect(Collectors.toMap(HoodieWriteStat::getFileId, writeStat -> 
-            new HoodieBaseFile(new Path(tableMetaClient.getBasePath(), 
writeStat.getPath()).toString())));
+            new HoodieBaseFile(new CachingPath(tableMetaClient.getBasePath(), 
writeStat.getPath()).toString())));
 
     Stream<HoodieBaseFile> committedBaseFiles = 
this.completedCommitsFileSystemView.getLatestBaseFiles(partitionStr);
     Map<String, HoodieBaseFile> allFileIds = committedBaseFiles
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieBaseFile.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieBaseFile.java
new file mode 100644
index 00000000000..15a93cd232b
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieBaseFile.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.common.util.Option;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestHoodieBaseFile {
+  private final String fileName = 
"136281f3-c24e-423b-a65a-95dbfbddce1d_1-0-1_100.parquet";
+  private final String pathStr = "file:/tmp/hoodie/2021/01/01/" + fileName;
+  private final String fileId = "136281f3-c24e-423b-a65a-95dbfbddce1d";
+  private final String baseCommitTime = "100";
+  private final int length = 10;
+
+  @Test
+  void createFromHoodieBaseFile() {
+    FileStatus fileStatus = new FileStatus(length, false, 0, 0, 0, 0, null, 
null, null, new Path(pathStr));
+    HoodieBaseFile hoodieBaseFile = new HoodieBaseFile(fileStatus);
+    assertFileGetters(fileStatus, new HoodieBaseFile(hoodieBaseFile), length, 
Option.empty());
+  }
+
+  @Test
+  void createFromFileStatus() {
+    FileStatus fileStatus = new FileStatus(length, false, 0, 0, 0, 0, null, 
null, null, new Path(pathStr));
+    HoodieBaseFile hoodieBaseFile = new HoodieBaseFile(fileStatus);
+    assertFileGetters(fileStatus, hoodieBaseFile, length, Option.empty());
+  }
+
+  @Test
+  void createFromFileStatusAndBootstrapBaseFile() {
+    HoodieBaseFile bootstrapBaseFile = new HoodieBaseFile(pathStr);
+    FileStatus fileStatus = new FileStatus(length, false, 0, 0, 0, 0, null, 
null, null, new Path(pathStr));
+    HoodieBaseFile hoodieBaseFile = new HoodieBaseFile(fileStatus, 
bootstrapBaseFile);
+    assertFileGetters(fileStatus, hoodieBaseFile, length, 
Option.of(bootstrapBaseFile));
+  }
+
+  @Test
+  void createFromFilePath() {
+    HoodieBaseFile hoodieBaseFile = new HoodieBaseFile(pathStr);
+    assertFileGetters(null, hoodieBaseFile, -1, Option.empty());
+  }
+
+  @Test
+  void createFromFilePathAndBootstrapBaseFile() {
+    HoodieBaseFile bootstrapBaseFile = new HoodieBaseFile(pathStr);
+    HoodieBaseFile hoodieBaseFile = new HoodieBaseFile(pathStr, 
bootstrapBaseFile);
+    assertFileGetters(null, hoodieBaseFile, -1, Option.of(bootstrapBaseFile));
+  }
+
+  private void assertFileGetters(FileStatus fileStatus, HoodieBaseFile 
hoodieBaseFile, long fileLength, Option<HoodieBaseFile> bootstrapBaseFile) {
+    assertEquals(fileId, hoodieBaseFile.getFileId());
+    assertEquals(baseCommitTime, hoodieBaseFile.getCommitTime());
+    assertEquals(bootstrapBaseFile, hoodieBaseFile.getBootstrapBaseFile());
+    assertEquals(fileName, hoodieBaseFile.getFileName());
+    assertEquals(pathStr, hoodieBaseFile.getPath());
+    assertEquals(new Path(pathStr), hoodieBaseFile.getHadoopPath());
+    assertEquals(fileLength, hoodieBaseFile.getFileSize());
+    assertEquals(fileStatus, hoodieBaseFile.getFileStatus());
+  }
+}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieLogFile.java 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieLogFile.java
new file mode 100644
index 00000000000..1096d222ad9
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieLogFile.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestHoodieLogFile {
+  private final String pathStr = 
"file:///tmp/hoodie/2021/01/01/.136281f3-c24e-423b-a65a-95dbfbddce1d_100.log.2_1-0-1";
+  private final String fileId = "136281f3-c24e-423b-a65a-95dbfbddce1d";
+  private final String baseCommitTime = "100";
+  private final int logVersion = 2;
+  private final String writeToken = "1-0-1";
+  private final String fileExtension = "log";
+
+  private final int length = 10;
+
+  @Test
+  void createFromLogFile() {
+    FileStatus fileStatus = new FileStatus(length, false, 0, 0, 0, 0, null, 
null, null, new Path(pathStr));
+    HoodieLogFile hoodieLogFile = new HoodieLogFile(fileStatus);
+    assertFileGetters(fileStatus, new HoodieLogFile(hoodieLogFile), length);
+  }
+
+  @Test
+  void createFromFileStatus() {
+    FileStatus fileStatus = new FileStatus(length, false, 0, 0, 0, 0, null, 
null, null, new Path(pathStr));
+    HoodieLogFile hoodieLogFile = new HoodieLogFile(fileStatus);
+    assertFileGetters(fileStatus, hoodieLogFile, length);
+  }
+
+  @Test
+  void createFromPath() {
+    HoodieLogFile hoodieLogFile = new HoodieLogFile(new Path(pathStr));
+    assertFileGetters(null, hoodieLogFile, -1);
+  }
+
+  @Test
+  void createFromPathAndLength() {
+    HoodieLogFile hoodieLogFile = new HoodieLogFile(new Path(pathStr), length);
+    assertFileGetters(null, hoodieLogFile, length);
+  }
+
+  @Test
+  void createFromString() {
+    HoodieLogFile hoodieLogFile = new HoodieLogFile(pathStr);
+    assertFileGetters(null, hoodieLogFile, -1);
+  }
+
+  @Test
+  void createFromStringWithSuffix() {
+    String suffix = ".cdc";
+    String pathWithSuffix = pathStr + suffix;
+    HoodieLogFile hoodieLogFile = new HoodieLogFile(pathWithSuffix);
+    assertFileGetters(pathWithSuffix, null, hoodieLogFile, -1, suffix);
+  }
+
+  private void assertFileGetters(FileStatus fileStatus, HoodieLogFile 
hoodieLogFile, long fileLength) {
+    assertFileGetters(pathStr, fileStatus, hoodieLogFile, fileLength, "");
+  }
+
+  private void assertFileGetters(String pathStr, FileStatus fileStatus, 
HoodieLogFile hoodieLogFile, long fileLength, String suffix) {
+    assertEquals(fileId, hoodieLogFile.getFileId());
+    assertEquals(baseCommitTime, hoodieLogFile.getBaseCommitTime());
+    assertEquals(logVersion, hoodieLogFile.getLogVersion());
+    assertEquals(writeToken, hoodieLogFile.getLogWriteToken());
+    assertEquals(fileExtension, hoodieLogFile.getFileExtension());
+    assertEquals(new Path(pathStr), hoodieLogFile.getPath());
+    assertEquals(fileLength, hoodieLogFile.getFileSize());
+    assertEquals(fileStatus, hoodieLogFile.getFileStatus());
+    assertEquals(suffix, hoodieLogFile.getSuffix());
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
index d63696effba..894a6463a44 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
@@ -19,8 +19,8 @@
 package org.apache.hudi.sink.partitioner.profile;
 
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -70,16 +70,14 @@ public class DeltaWriteProfile extends WriteProfile {
       for (FileSlice smallFileSlice : allSmallFileSlices) {
         SmallFile sf = new SmallFile();
         if (smallFileSlice.getBaseFile().isPresent()) {
-          // TODO : Move logic of file name, file id, base commit time 
handling inside file slice
-          String filename = smallFileSlice.getBaseFile().get().getFileName();
-          sf.location = new 
HoodieRecordLocation(FSUtils.getCommitTime(filename), 
FSUtils.getFileId(filename));
+          HoodieBaseFile baseFile = smallFileSlice.getBaseFile().get();
+          sf.location = new HoodieRecordLocation(baseFile.getCommitTime(), 
baseFile.getFileId());
           sf.sizeBytes = getTotalFileSize(smallFileSlice);
           smallFileLocations.add(sf);
         } else {
           smallFileSlice.getLogFiles().findFirst().ifPresent(logFile -> {
             // in case there is something error, and the file slice has no log 
file
-            sf.location = new 
HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
-                FSUtils.getFileIdFromLogPath(logFile.getPath()));
+            sf.location = new 
HoodieRecordLocation(logFile.getBaseCommitTime(), logFile.getFileId());
             sf.sizeBytes = getTotalFileSize(smallFileSlice);
             smallFileLocations.add(sf);
           });
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
index 9dd604a717f..1f41888ff45 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
@@ -20,7 +20,6 @@ package org.apache.hudi.sink.partitioner.profile;
 
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.common.config.HoodieStorageConfig;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieRecordLocation;
@@ -210,9 +209,8 @@ public class WriteProfile {
       for (HoodieBaseFile file : allFiles) {
         // filter out the corrupted files.
         if (file.getFileSize() < config.getParquetSmallFileLimit() && 
file.getFileSize() > 0) {
-          String filename = file.getFileName();
           SmallFile sf = new SmallFile();
-          sf.location = new 
HoodieRecordLocation(FSUtils.getCommitTime(filename), 
FSUtils.getFileId(filename));
+          sf.location = new HoodieRecordLocation(file.getCommitTime(), 
file.getFileId());
           sf.sizeBytes = file.getFileSize();
           smallFileLocations.add(sf);
         }
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java
index 4b0b2d6ea79..043122fbdf8 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java
@@ -18,12 +18,14 @@
 
 package org.apache.hudi.hadoop.realtime;
 
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.InputSplitWithLocationInfo;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.hadoop.CachingPath;
 import org.apache.hudi.hadoop.InputSplitUtils;
 
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.InputSplitWithLocationInfo;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -126,7 +128,7 @@ public interface RealtimeSplit extends 
InputSplitWithLocationInfo {
     for (int i = 0; i < totalLogFiles; i++) {
       String logFilePath = InputSplitUtils.readString(in);
       long logFileSize = in.readLong();
-      deltaLogPaths.add(new HoodieLogFile(new Path(logFilePath), logFileSize));
+      deltaLogPaths.add(new HoodieLogFile(new CachingPath(logFilePath), 
logFileSize));
     }
     setDeltaLogFiles(deltaLogPaths);
 
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeFileSplit.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeFileSplit.java
index d2c4f1be61d..4b0f379aedb 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeFileSplit.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeFileSplit.java
@@ -70,7 +70,8 @@ public class TestHoodieRealtimeFileSplit {
   @BeforeEach
   public void setUp(@TempDir java.nio.file.Path tempDir) throws Exception {
     basePath = tempDir.toAbsolutePath().toString();
-    deltaLogFiles = Collections.singletonList(new HoodieLogFile(new 
Path(basePath + "/1.log"), 0L));
+    Path logPath = new Path(basePath + "/1.log");
+    deltaLogFiles = Collections.singletonList(new HoodieLogFile(logPath, 0L));
     deltaLogPaths = Collections.singletonList(basePath + "/1.log");
     fileSplitName = basePath + "/test.file";
     baseFileSplit = new FileSplit(new Path(fileSplitName), 0, 100, new 
String[] {});
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 16c0b011479..d79957c735f 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -875,7 +875,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
         }
         Schema readerSchema = converter.convert(messageType);
         reader =
-            HoodieLogFormat.newReader(fs, new HoodieLogFile(new 
Path(logFilePathStr)), readerSchema);
+            HoodieLogFormat.newReader(fs, new HoodieLogFile(logFilePathStr), 
readerSchema);
         // read the avro blocks
         if (reader.hasNext()) {
           HoodieLogBlock block = reader.next();

Reply via email to