This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 97b4d591aa0 [HUDI-8073] Add hosts to storage path info and use it if 
present (#11761)
97b4d591aa0 is described below

commit 97b4d591aa07c5ec8bbebf748fd5383c30393914
Author: Jon Vexler <[email protected]>
AuthorDate: Fri Aug 16 18:31:09 2024 -0400

    [HUDI-8073] Add hosts to storage path info and use it if present (#11761)
    
    Co-authored-by: Jonathan Vexler <=>
    Co-authored-by: Shawn Chang <[email protected]>
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../hudi/common/engine/HoodieReaderContext.java    | 19 +++++++++++++++
 .../apache/hudi/common/model/HoodieBaseFile.java   |  2 +-
 .../common/table/read/HoodieFileGroupReader.java   | 27 +++++++++++++++++-----
 .../org/apache/hudi/hadoop/fs/HadoopFSUtils.java   | 11 +++++++++
 .../hudi/hadoop/HiveHoodieReaderContext.java       | 21 +++++++++++++----
 .../HoodieFileGroupReaderBasedRecordReader.java    | 22 +++++++-----------
 .../org/apache/hudi/storage/StoragePathInfo.java   | 23 ++++++++++++++++++
 7 files changed, 99 insertions(+), 26 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index b12a11d4b57..5eec66d15e6 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
@@ -159,6 +160,24 @@ public abstract class HoodieReaderContext<T> {
       StoragePath filePath, long start, long length, Schema dataSchema, Schema 
requiredSchema,
       HoodieStorage storage) throws IOException;
 
+  /**
+   * Gets the record iterator based on the type of engine-specific record 
representation from the
+   * file.
+   *
+   * @param storagePathInfo {@link StoragePathInfo} instance of a file.
+   * @param start           Starting byte to start reading.
+   * @param length          Bytes to read.
+   * @param dataSchema      Schema of records in the file in {@link Schema}.
+   * @param requiredSchema  Schema containing required fields to read in 
{@link Schema} for projection.
+   * @param storage         {@link HoodieStorage} for reading records.
+   * @return {@link ClosableIterator<T>} that can return all records through 
iteration.
+   */
+  public ClosableIterator<T> getFileRecordIterator(
+      StoragePathInfo storagePathInfo, long start, long length, Schema 
dataSchema, Schema requiredSchema,
+      HoodieStorage storage) throws IOException {
+    return getFileRecordIterator(storagePathInfo.getPath(), start, length, 
dataSchema, requiredSchema, storage);
+  }
+
   /**
    * Converts an Avro record, e.g., serialized in the log files, to an 
engine-specific record.
    *
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieBaseFile.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieBaseFile.java
index 5b8c3fcb11f..f5631078270 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieBaseFile.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieBaseFile.java
@@ -145,7 +145,7 @@ public class HoodieBaseFile extends BaseFile {
       StoragePath parent = pathInfo.getPath().getParent();
       return new StoragePathInfo(
           new StoragePath(parent, fileId), pathInfo.getLength(), 
pathInfo.isDirectory(),
-          pathInfo.getBlockReplication(), pathInfo.getBlockSize(), 
pathInfo.getModificationTime());
+          pathInfo.getBlockReplication(), pathInfo.getBlockSize(), 
pathInfo.getModificationTime(), pathInfo.getLocations());
     } else {
       return pathInfo;
     }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index 1c59e8f0ba2..e99d076e62f 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -41,6 +41,7 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
 
 import org.apache.avro.Schema;
 
@@ -145,10 +146,18 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
       return makeBootstrapBaseFileIterator(baseFile);
     }
 
-    return readerContext.getFileRecordIterator(
-        baseFile.getStoragePath(), start, length,
-        readerContext.getSchemaHandler().getDataSchema(),
-        readerContext.getSchemaHandler().getRequiredSchema(), storage);
+    StoragePathInfo baseFileStoragePathInfo = baseFile.getPathInfo();
+    if (baseFileStoragePathInfo != null) {
+      return readerContext.getFileRecordIterator(
+          baseFileStoragePathInfo, start, length,
+          readerContext.getSchemaHandler().getDataSchema(),
+          readerContext.getSchemaHandler().getRequiredSchema(), storage);
+    } else {
+      return readerContext.getFileRecordIterator(
+          baseFile.getStoragePath(), start, length,
+          readerContext.getSchemaHandler().getDataSchema(),
+          readerContext.getSchemaHandler().getRequiredSchema(), storage);
+    }
   }
 
   private ClosableIterator<T> makeBootstrapBaseFileIterator(HoodieBaseFile 
baseFile) throws IOException {
@@ -186,8 +195,14 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
       return Option.empty();
     }
     Schema requiredSchema = 
readerContext.getSchemaHandler().createSchemaFromFields(requiredFields);
-    return 
Option.of(Pair.of(readerContext.getFileRecordIterator(file.getStoragePath(), 0, 
file.getFileLen(),
-        readerContext.getSchemaHandler().createSchemaFromFields(allFields), 
requiredSchema, storage), requiredSchema));
+    StoragePathInfo fileStoragePathInfo = file.getPathInfo();
+    if (fileStoragePathInfo != null) {
+      return 
Option.of(Pair.of(readerContext.getFileRecordIterator(fileStoragePathInfo, 0, 
file.getFileLen(),
+          readerContext.getSchemaHandler().createSchemaFromFields(allFields), 
requiredSchema, storage), requiredSchema));
+    } else {
+      return 
Option.of(Pair.of(readerContext.getFileRecordIterator(file.getStoragePath(), 0, 
file.getFileLen(),
+          readerContext.getSchemaHandler().createSchemaFromFields(allFields), 
requiredSchema, storage), requiredSchema));
+    }
   }
 
   /**
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java
index e8e92f6b420..655ebb2f7bd 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java
@@ -171,6 +171,17 @@ public class HadoopFSUtils {
         fileStatus.getModificationTime());
   }
 
+  public static StoragePathInfo convertToStoragePathInfo(FileStatus 
fileStatus, String[] locations) {
+    return new StoragePathInfo(
+        convertToStoragePath(fileStatus.getPath()),
+        fileStatus.getLen(),
+        fileStatus.isDirectory(),
+        fileStatus.getReplication(),
+        fileStatus.getBlockSize(),
+        fileStatus.getModificationTime(),
+        locations);
+  }
+
   /**
    * @param pathInfo {@link StoragePathInfo} instance.
    * @return the {@link FileStatus} instance after conversion.
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
index 46fcba63112..d2b6ccd8ee8 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
@@ -37,6 +37,7 @@ import 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
 import org.apache.hudi.hadoop.utils.ObjectInspectorCache;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
@@ -76,7 +77,6 @@ public class HiveHoodieReaderContext extends 
HoodieReaderContext<ArrayWritable>
   protected final JobConf jobConf;
   protected final Reporter reporter;
   protected final Schema writerSchema;
-  protected Map<String, String[]> hosts;
   protected final Map<String, TypeInfo> columnTypeMap;
   private final ObjectInspectorCache objectInspectorCache;
   private RecordReader<NullWritable, ArrayWritable> firstRecordReader = null;
@@ -91,14 +91,12 @@ public class HiveHoodieReaderContext extends 
HoodieReaderContext<ArrayWritable>
                                     JobConf jobConf,
                                     Reporter reporter,
                                     Schema writerSchema,
-                                    Map<String, String[]> hosts,
                                     HoodieTableMetaClient metaClient) {
     this.readerCreator = readerCreator;
     this.split = split;
     this.jobConf = jobConf;
     this.reporter = reporter;
     this.writerSchema = writerSchema;
-    this.hosts = hosts;
     this.partitionCols = getPartitionFieldNames(jobConf).stream().filter(n -> 
writerSchema.getField(n) != null).collect(Collectors.toList());
     this.partitionColSet = new HashSet<>(this.partitionCols);
     String tableName = metaClient.getTableConfig().getTableName();
@@ -141,14 +139,27 @@ public class HiveHoodieReaderContext extends 
HoodieReaderContext<ArrayWritable>
   }
 
   @Override
-  public ClosableIterator<ArrayWritable> getFileRecordIterator(StoragePath 
filePath, long start, long length, Schema dataSchema, Schema requiredSchema, 
HoodieStorage storage) throws IOException {
+  public ClosableIterator<ArrayWritable> getFileRecordIterator(StoragePath 
filePath, long start, long length, Schema dataSchema,
+                                                               Schema 
requiredSchema, HoodieStorage storage) throws IOException {
+    return getFileRecordIterator(filePath, null, start, length, dataSchema, 
requiredSchema, storage);
+  }
+
+  @Override
+  public ClosableIterator<ArrayWritable> getFileRecordIterator(
+      StoragePathInfo storagePathInfo, long start, long length, Schema 
dataSchema, Schema requiredSchema,
+      HoodieStorage storage) throws IOException {
+    return getFileRecordIterator(storagePathInfo.getPath(), 
storagePathInfo.getLocations(), start, length, dataSchema, requiredSchema, 
storage);
+  }
+
+  private ClosableIterator<ArrayWritable> getFileRecordIterator(StoragePath 
filePath, String[] hosts, long start, long length, Schema dataSchema,
+                                                                Schema 
requiredSchema, HoodieStorage storage) throws IOException {
     JobConf jobConfCopy = new JobConf(jobConf);
     //move the partition cols to the end, because in some cases it has issues 
if we don't do that
     Schema modifiedDataSchema = 
HoodieAvroUtils.generateProjectionSchema(dataSchema, 
Stream.concat(dataSchema.getFields().stream()
             .map(f -> f.name().toLowerCase(Locale.ROOT)).filter(n -> 
!partitionColSet.contains(n)),
         partitionCols.stream().filter(c -> dataSchema.getField(c) != 
null)).collect(Collectors.toList()));
     setSchemas(jobConfCopy, modifiedDataSchema, requiredSchema);
-    InputSplit inputSplit = new FileSplit(new Path(filePath.toString()), 
start, length, hosts.get(filePath.toString()));
+    InputSplit inputSplit = new FileSplit(new Path(filePath.toString()), 
start, length, hosts);
     RecordReader<NullWritable, ArrayWritable> recordReader = 
readerCreator.getRecordReader(inputSplit, jobConfCopy, reporter);
     if (firstRecordReader == null) {
       firstRecordReader = recordReader;
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
index e26e4b0efa9..f72b79b56e1 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
@@ -54,10 +54,8 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Locale;
-import java.util.Map;
 import java.util.Set;
 import java.util.function.UnaryOperator;
 import java.util.stream.Collectors;
@@ -117,8 +115,7 @@ public class HoodieFileGroupReaderBasedRecordReader 
implements RecordReader<Null
     String latestCommitTime = getLatestCommitTime(split, metaClient);
     Schema tableSchema = getLatestTableSchema(metaClient, jobConfCopy, 
latestCommitTime);
     Schema requestedSchema = createRequestedSchema(tableSchema, jobConfCopy);
-    Map<String, String[]> hosts = new HashMap<>();
-    this.readerContext = new HiveHoodieReaderContext(readerCreator, split, 
jobConfCopy, reporter, tableSchema, hosts, metaClient);
+    this.readerContext = new HiveHoodieReaderContext(readerCreator, split, 
jobConfCopy, reporter, tableSchema, metaClient);
     this.arrayWritable = new ArrayWritable(Writable.class, new 
Writable[requestedSchema.getFields().size()]);
     TypedProperties props = metaClient.getTableConfig().getProps();
     jobConf.forEach(e -> {
@@ -128,7 +125,7 @@ public class HoodieFileGroupReaderBasedRecordReader 
implements RecordReader<Null
     });
     LOG.debug("Creating HoodieFileGroupReaderRecordReader with 
tableBasePath={}, latestCommitTime={}, fileSplit={}", tableBasePath, 
latestCommitTime, fileSplit.getPath());
     this.fileGroupReader = new HoodieFileGroupReader<>(readerContext, 
metaClient.getStorage(), tableBasePath,
-        latestCommitTime, getFileSliceFromSplit(fileSplit, hosts, 
getFs(tableBasePath, jobConfCopy), tableBasePath),
+        latestCommitTime, getFileSliceFromSplit(fileSplit, 
getFs(tableBasePath, jobConfCopy), tableBasePath),
         tableSchema, requestedSchema, Option.empty(), metaClient, props, 
fileSplit.getStart(),
         fileSplit.getLength(), false);
     this.fileGroupReader.initRecordIterators();
@@ -208,8 +205,8 @@ public class HoodieFileGroupReaderBasedRecordReader 
implements RecordReader<Null
   /**
    * Convert FileSplit to FileSlice, but save the locations in 'hosts' because 
that data is otherwise lost.
    */
-  private static FileSlice getFileSliceFromSplit(FileSplit split, Map<String, 
String[]> hosts, FileSystem fs, String tableBasePath) throws IOException {
-    BaseFile bootstrapBaseFile = createBootstrapBaseFile(split, hosts, fs);
+  private static FileSlice getFileSliceFromSplit(FileSplit split, FileSystem 
fs, String tableBasePath) throws IOException {
+    BaseFile bootstrapBaseFile = createBootstrapBaseFile(split, fs);
     if (split instanceof RealtimeSplit) {
       // MOR
       RealtimeSplit realtimeSplit = (RealtimeSplit) split;
@@ -227,26 +224,23 @@ public class HoodieFileGroupReaderBasedRecordReader 
implements RecordReader<Null
       if (isLogFile) {
         return new FileSlice(fileGroupId, commitTime, null, 
realtimeSplit.getDeltaLogFiles());
       }
-      hosts.put(realtimeSplit.getPath().toString(), 
realtimeSplit.getLocations());
-      HoodieBaseFile hoodieBaseFile = new 
HoodieBaseFile(convertToStoragePathInfo(fs.getFileStatus(realtimeSplit.getPath())),
 bootstrapBaseFile);
+      HoodieBaseFile hoodieBaseFile = new 
HoodieBaseFile(convertToStoragePathInfo(fs.getFileStatus(realtimeSplit.getPath()),
 realtimeSplit.getLocations()), bootstrapBaseFile);
       return new FileSlice(fileGroupId, commitTime, hoodieBaseFile, 
realtimeSplit.getDeltaLogFiles());
     }
     // COW
     HoodieFileGroupId fileGroupId = new 
HoodieFileGroupId(getFileId(split.getPath().getName()), 
getRelativePartitionPath(new Path(tableBasePath), split.getPath()));
-    hosts.put(split.getPath().toString(), split.getLocations());
     return new FileSlice(
         fileGroupId,
         getCommitTime(split.getPath().toString()),
-        new 
HoodieBaseFile(convertToStoragePathInfo(fs.getFileStatus(split.getPath())), 
bootstrapBaseFile),
+        new 
HoodieBaseFile(convertToStoragePathInfo(fs.getFileStatus(split.getPath()), 
split.getLocations()), bootstrapBaseFile),
         Collections.emptyList());
   }
 
-  private static BaseFile createBootstrapBaseFile(FileSplit split, Map<String, 
String[]> hosts, FileSystem fs) throws IOException {
+  private static BaseFile createBootstrapBaseFile(FileSplit split, FileSystem 
fs) throws IOException {
     if (split instanceof BootstrapBaseFileSplit) {
       BootstrapBaseFileSplit bootstrapBaseFileSplit = (BootstrapBaseFileSplit) 
split;
       FileSplit bootstrapFileSplit = 
bootstrapBaseFileSplit.getBootstrapFileSplit();
-      hosts.put(bootstrapFileSplit.getPath().toString(), 
bootstrapFileSplit.getLocations());
-      return new 
BaseFile(convertToStoragePathInfo(fs.getFileStatus(bootstrapFileSplit.getPath())));
+      return new 
BaseFile(convertToStoragePathInfo(fs.getFileStatus(bootstrapFileSplit.getPath()),
 bootstrapFileSplit.getLocations()));
     }
     return null;
   }
diff --git a/hudi-io/src/main/java/org/apache/hudi/storage/StoragePathInfo.java 
b/hudi-io/src/main/java/org/apache/hudi/storage/StoragePathInfo.java
index 1c1ebc32a2f..46e4307c294 100644
--- a/hudi-io/src/main/java/org/apache/hudi/storage/StoragePathInfo.java
+++ b/hudi-io/src/main/java/org/apache/hudi/storage/StoragePathInfo.java
@@ -24,6 +24,7 @@ import org.apache.hudi.PublicAPIClass;
 import org.apache.hudi.PublicAPIMethod;
 
 import java.io.Serializable;
+import java.util.Arrays;
 
 /**
  * Represents the information of a storage path representing a directory or a 
file.
@@ -38,6 +39,7 @@ public class StoragePathInfo implements Serializable, 
Comparable<StoragePathInfo
   private final short blockReplication;
   private final long blockSize;
   private final long modificationTime;
+  private final String[] locations;
 
   public StoragePathInfo(StoragePath path,
                          long length,
@@ -45,12 +47,24 @@ public class StoragePathInfo implements Serializable, 
Comparable<StoragePathInfo
                          short blockReplication,
                          long blockSize,
                          long modificationTime) {
+    this(path, length, isDirectory, blockReplication,
+        blockSize, modificationTime, null);
+  }
+
+  public StoragePathInfo(StoragePath path,
+                         long length,
+                         boolean isDirectory,
+                         short blockReplication,
+                         long blockSize,
+                         long modificationTime,
+                         String[] locations) {
     this.path = path;
     this.length = length;
     this.isDirectory = isDirectory;
     this.blockReplication = blockReplication;
     this.blockSize = blockSize;
     this.modificationTime = modificationTime;
+    this.locations = locations;
   }
 
   /**
@@ -109,6 +123,14 @@ public class StoragePathInfo implements Serializable, 
Comparable<StoragePathInfo
     return modificationTime;
   }
 
+  /**
+   * @return the locations of the file in the file system, possibly null.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public String[] getLocations() {
+    return locations;
+  }
+
   @Override
   public int compareTo(StoragePathInfo o) {
     return this.getPath().compareTo(o.getPath());
@@ -144,6 +166,7 @@ public class StoragePathInfo implements Serializable, 
Comparable<StoragePathInfo
         + ", blockReplication=" + blockReplication
         + ", blockSize=" + blockSize
         + ", modificationTime=" + modificationTime
+        + ", locations=" + Arrays.toString(locations)
         + '}';
   }
 }

Reply via email to