This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 97b4d591aa0 [HUDI-8073] Add hosts to storage path info and use it if
present (#11761)
97b4d591aa0 is described below
commit 97b4d591aa07c5ec8bbebf748fd5383c30393914
Author: Jon Vexler <[email protected]>
AuthorDate: Fri Aug 16 18:31:09 2024 -0400
[HUDI-8073] Add hosts to storage path info and use it if present (#11761)
Co-authored-by: Jonathan Vexler <=>
Co-authored-by: Shawn Chang <[email protected]>
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../hudi/common/engine/HoodieReaderContext.java | 19 +++++++++++++++
.../apache/hudi/common/model/HoodieBaseFile.java | 2 +-
.../common/table/read/HoodieFileGroupReader.java | 27 +++++++++++++++++-----
.../org/apache/hudi/hadoop/fs/HadoopFSUtils.java | 11 +++++++++
.../hudi/hadoop/HiveHoodieReaderContext.java | 21 +++++++++++++----
.../HoodieFileGroupReaderBasedRecordReader.java | 22 +++++++-----------
.../org/apache/hudi/storage/StoragePathInfo.java | 23 ++++++++++++++++++
7 files changed, 99 insertions(+), 26 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index b12a11d4b57..5eec66d15e6 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
@@ -159,6 +160,24 @@ public abstract class HoodieReaderContext<T> {
StoragePath filePath, long start, long length, Schema dataSchema, Schema
requiredSchema,
HoodieStorage storage) throws IOException;
+ /**
+ * Gets the record iterator based on the type of engine-specific record
representation from the
+ * file.
+ *
+ * @param storagePathInfo {@link StoragePathInfo} instance of a file.
+ * @param start Starting byte to start reading.
+ * @param length Bytes to read.
+ * @param dataSchema Schema of records in the file in {@link Schema}.
+ * @param requiredSchema Schema containing required fields to read in
{@link Schema} for projection.
+ * @param storage {@link HoodieStorage} for reading records.
+ * @return {@link ClosableIterator<T>} that can return all records through
iteration.
+ */
+ public ClosableIterator<T> getFileRecordIterator(
+ StoragePathInfo storagePathInfo, long start, long length, Schema
dataSchema, Schema requiredSchema,
+ HoodieStorage storage) throws IOException {
+ return getFileRecordIterator(storagePathInfo.getPath(), start, length,
dataSchema, requiredSchema, storage);
+ }
+
/**
* Converts an Avro record, e.g., serialized in the log files, to an
engine-specific record.
*
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieBaseFile.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieBaseFile.java
index 5b8c3fcb11f..f5631078270 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieBaseFile.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieBaseFile.java
@@ -145,7 +145,7 @@ public class HoodieBaseFile extends BaseFile {
StoragePath parent = pathInfo.getPath().getParent();
return new StoragePathInfo(
new StoragePath(parent, fileId), pathInfo.getLength(),
pathInfo.isDirectory(),
- pathInfo.getBlockReplication(), pathInfo.getBlockSize(),
pathInfo.getModificationTime());
+ pathInfo.getBlockReplication(), pathInfo.getBlockSize(),
pathInfo.getModificationTime(), pathInfo.getLocations());
} else {
return pathInfo;
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index 1c59e8f0ba2..e99d076e62f 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -41,6 +41,7 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
import org.apache.avro.Schema;
@@ -145,10 +146,18 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
return makeBootstrapBaseFileIterator(baseFile);
}
- return readerContext.getFileRecordIterator(
- baseFile.getStoragePath(), start, length,
- readerContext.getSchemaHandler().getDataSchema(),
- readerContext.getSchemaHandler().getRequiredSchema(), storage);
+ StoragePathInfo baseFileStoragePathInfo = baseFile.getPathInfo();
+ if (baseFileStoragePathInfo != null) {
+ return readerContext.getFileRecordIterator(
+ baseFileStoragePathInfo, start, length,
+ readerContext.getSchemaHandler().getDataSchema(),
+ readerContext.getSchemaHandler().getRequiredSchema(), storage);
+ } else {
+ return readerContext.getFileRecordIterator(
+ baseFile.getStoragePath(), start, length,
+ readerContext.getSchemaHandler().getDataSchema(),
+ readerContext.getSchemaHandler().getRequiredSchema(), storage);
+ }
}
private ClosableIterator<T> makeBootstrapBaseFileIterator(HoodieBaseFile
baseFile) throws IOException {
@@ -186,8 +195,14 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
return Option.empty();
}
Schema requiredSchema =
readerContext.getSchemaHandler().createSchemaFromFields(requiredFields);
- return
Option.of(Pair.of(readerContext.getFileRecordIterator(file.getStoragePath(), 0,
file.getFileLen(),
- readerContext.getSchemaHandler().createSchemaFromFields(allFields),
requiredSchema, storage), requiredSchema));
+ StoragePathInfo fileStoragePathInfo = file.getPathInfo();
+ if (fileStoragePathInfo != null) {
+ return
Option.of(Pair.of(readerContext.getFileRecordIterator(fileStoragePathInfo, 0,
file.getFileLen(),
+ readerContext.getSchemaHandler().createSchemaFromFields(allFields),
requiredSchema, storage), requiredSchema));
+ } else {
+ return
Option.of(Pair.of(readerContext.getFileRecordIterator(file.getStoragePath(), 0,
file.getFileLen(),
+ readerContext.getSchemaHandler().createSchemaFromFields(allFields),
requiredSchema, storage), requiredSchema));
+ }
}
/**
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java
index e8e92f6b420..655ebb2f7bd 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java
@@ -171,6 +171,17 @@ public class HadoopFSUtils {
fileStatus.getModificationTime());
}
+ public static StoragePathInfo convertToStoragePathInfo(FileStatus
fileStatus, String[] locations) {
+ return new StoragePathInfo(
+ convertToStoragePath(fileStatus.getPath()),
+ fileStatus.getLen(),
+ fileStatus.isDirectory(),
+ fileStatus.getReplication(),
+ fileStatus.getBlockSize(),
+ fileStatus.getModificationTime(),
+ locations);
+ }
+
/**
* @param pathInfo {@link StoragePathInfo} instance.
* @return the {@link FileStatus} instance after conversion.
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
index 46fcba63112..d2b6ccd8ee8 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
@@ -37,6 +37,7 @@ import
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
import org.apache.hudi.hadoop.utils.ObjectInspectorCache;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
@@ -76,7 +77,6 @@ public class HiveHoodieReaderContext extends
HoodieReaderContext<ArrayWritable>
protected final JobConf jobConf;
protected final Reporter reporter;
protected final Schema writerSchema;
- protected Map<String, String[]> hosts;
protected final Map<String, TypeInfo> columnTypeMap;
private final ObjectInspectorCache objectInspectorCache;
private RecordReader<NullWritable, ArrayWritable> firstRecordReader = null;
@@ -91,14 +91,12 @@ public class HiveHoodieReaderContext extends
HoodieReaderContext<ArrayWritable>
JobConf jobConf,
Reporter reporter,
Schema writerSchema,
- Map<String, String[]> hosts,
HoodieTableMetaClient metaClient) {
this.readerCreator = readerCreator;
this.split = split;
this.jobConf = jobConf;
this.reporter = reporter;
this.writerSchema = writerSchema;
- this.hosts = hosts;
this.partitionCols = getPartitionFieldNames(jobConf).stream().filter(n ->
writerSchema.getField(n) != null).collect(Collectors.toList());
this.partitionColSet = new HashSet<>(this.partitionCols);
String tableName = metaClient.getTableConfig().getTableName();
@@ -141,14 +139,27 @@ public class HiveHoodieReaderContext extends
HoodieReaderContext<ArrayWritable>
}
@Override
- public ClosableIterator<ArrayWritable> getFileRecordIterator(StoragePath
filePath, long start, long length, Schema dataSchema, Schema requiredSchema,
HoodieStorage storage) throws IOException {
+ public ClosableIterator<ArrayWritable> getFileRecordIterator(StoragePath
filePath, long start, long length, Schema dataSchema,
+ Schema
requiredSchema, HoodieStorage storage) throws IOException {
+ return getFileRecordIterator(filePath, null, start, length, dataSchema,
requiredSchema, storage);
+ }
+
+ @Override
+ public ClosableIterator<ArrayWritable> getFileRecordIterator(
+ StoragePathInfo storagePathInfo, long start, long length, Schema
dataSchema, Schema requiredSchema,
+ HoodieStorage storage) throws IOException {
+ return getFileRecordIterator(storagePathInfo.getPath(),
storagePathInfo.getLocations(), start, length, dataSchema, requiredSchema,
storage);
+ }
+
+ private ClosableIterator<ArrayWritable> getFileRecordIterator(StoragePath
filePath, String[] hosts, long start, long length, Schema dataSchema,
+ Schema
requiredSchema, HoodieStorage storage) throws IOException {
JobConf jobConfCopy = new JobConf(jobConf);
//move the partition cols to the end, because in some cases it has issues
if we don't do that
Schema modifiedDataSchema =
HoodieAvroUtils.generateProjectionSchema(dataSchema,
Stream.concat(dataSchema.getFields().stream()
.map(f -> f.name().toLowerCase(Locale.ROOT)).filter(n ->
!partitionColSet.contains(n)),
partitionCols.stream().filter(c -> dataSchema.getField(c) !=
null)).collect(Collectors.toList()));
setSchemas(jobConfCopy, modifiedDataSchema, requiredSchema);
- InputSplit inputSplit = new FileSplit(new Path(filePath.toString()),
start, length, hosts.get(filePath.toString()));
+ InputSplit inputSplit = new FileSplit(new Path(filePath.toString()),
start, length, hosts);
RecordReader<NullWritable, ArrayWritable> recordReader =
readerCreator.getRecordReader(inputSplit, jobConfCopy, reporter);
if (firstRecordReader == null) {
firstRecordReader = recordReader;
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
index e26e4b0efa9..f72b79b56e1 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
@@ -54,10 +54,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Locale;
-import java.util.Map;
import java.util.Set;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
@@ -117,8 +115,7 @@ public class HoodieFileGroupReaderBasedRecordReader
implements RecordReader<Null
String latestCommitTime = getLatestCommitTime(split, metaClient);
Schema tableSchema = getLatestTableSchema(metaClient, jobConfCopy,
latestCommitTime);
Schema requestedSchema = createRequestedSchema(tableSchema, jobConfCopy);
- Map<String, String[]> hosts = new HashMap<>();
- this.readerContext = new HiveHoodieReaderContext(readerCreator, split,
jobConfCopy, reporter, tableSchema, hosts, metaClient);
+ this.readerContext = new HiveHoodieReaderContext(readerCreator, split,
jobConfCopy, reporter, tableSchema, metaClient);
this.arrayWritable = new ArrayWritable(Writable.class, new
Writable[requestedSchema.getFields().size()]);
TypedProperties props = metaClient.getTableConfig().getProps();
jobConf.forEach(e -> {
@@ -128,7 +125,7 @@ public class HoodieFileGroupReaderBasedRecordReader
implements RecordReader<Null
});
LOG.debug("Creating HoodieFileGroupReaderRecordReader with
tableBasePath={}, latestCommitTime={}, fileSplit={}", tableBasePath,
latestCommitTime, fileSplit.getPath());
this.fileGroupReader = new HoodieFileGroupReader<>(readerContext,
metaClient.getStorage(), tableBasePath,
- latestCommitTime, getFileSliceFromSplit(fileSplit, hosts,
getFs(tableBasePath, jobConfCopy), tableBasePath),
+ latestCommitTime, getFileSliceFromSplit(fileSplit,
getFs(tableBasePath, jobConfCopy), tableBasePath),
tableSchema, requestedSchema, Option.empty(), metaClient, props,
fileSplit.getStart(),
fileSplit.getLength(), false);
this.fileGroupReader.initRecordIterators();
@@ -208,8 +205,8 @@ public class HoodieFileGroupReaderBasedRecordReader
implements RecordReader<Null
/**
* Convert FileSplit to FileSlice, but save the locations in 'hosts' because
that data is otherwise lost.
*/
- private static FileSlice getFileSliceFromSplit(FileSplit split, Map<String,
String[]> hosts, FileSystem fs, String tableBasePath) throws IOException {
- BaseFile bootstrapBaseFile = createBootstrapBaseFile(split, hosts, fs);
+ private static FileSlice getFileSliceFromSplit(FileSplit split, FileSystem
fs, String tableBasePath) throws IOException {
+ BaseFile bootstrapBaseFile = createBootstrapBaseFile(split, fs);
if (split instanceof RealtimeSplit) {
// MOR
RealtimeSplit realtimeSplit = (RealtimeSplit) split;
@@ -227,26 +224,23 @@ public class HoodieFileGroupReaderBasedRecordReader
implements RecordReader<Null
if (isLogFile) {
return new FileSlice(fileGroupId, commitTime, null,
realtimeSplit.getDeltaLogFiles());
}
- hosts.put(realtimeSplit.getPath().toString(),
realtimeSplit.getLocations());
- HoodieBaseFile hoodieBaseFile = new
HoodieBaseFile(convertToStoragePathInfo(fs.getFileStatus(realtimeSplit.getPath())),
bootstrapBaseFile);
+ HoodieBaseFile hoodieBaseFile = new
HoodieBaseFile(convertToStoragePathInfo(fs.getFileStatus(realtimeSplit.getPath()),
realtimeSplit.getLocations()), bootstrapBaseFile);
return new FileSlice(fileGroupId, commitTime, hoodieBaseFile,
realtimeSplit.getDeltaLogFiles());
}
// COW
HoodieFileGroupId fileGroupId = new
HoodieFileGroupId(getFileId(split.getPath().getName()),
getRelativePartitionPath(new Path(tableBasePath), split.getPath()));
- hosts.put(split.getPath().toString(), split.getLocations());
return new FileSlice(
fileGroupId,
getCommitTime(split.getPath().toString()),
- new
HoodieBaseFile(convertToStoragePathInfo(fs.getFileStatus(split.getPath())),
bootstrapBaseFile),
+ new
HoodieBaseFile(convertToStoragePathInfo(fs.getFileStatus(split.getPath()),
split.getLocations()), bootstrapBaseFile),
Collections.emptyList());
}
- private static BaseFile createBootstrapBaseFile(FileSplit split, Map<String,
String[]> hosts, FileSystem fs) throws IOException {
+ private static BaseFile createBootstrapBaseFile(FileSplit split, FileSystem
fs) throws IOException {
if (split instanceof BootstrapBaseFileSplit) {
BootstrapBaseFileSplit bootstrapBaseFileSplit = (BootstrapBaseFileSplit)
split;
FileSplit bootstrapFileSplit =
bootstrapBaseFileSplit.getBootstrapFileSplit();
- hosts.put(bootstrapFileSplit.getPath().toString(),
bootstrapFileSplit.getLocations());
- return new
BaseFile(convertToStoragePathInfo(fs.getFileStatus(bootstrapFileSplit.getPath())));
+ return new
BaseFile(convertToStoragePathInfo(fs.getFileStatus(bootstrapFileSplit.getPath()),
bootstrapFileSplit.getLocations()));
}
return null;
}
diff --git a/hudi-io/src/main/java/org/apache/hudi/storage/StoragePathInfo.java
b/hudi-io/src/main/java/org/apache/hudi/storage/StoragePathInfo.java
index 1c1ebc32a2f..46e4307c294 100644
--- a/hudi-io/src/main/java/org/apache/hudi/storage/StoragePathInfo.java
+++ b/hudi-io/src/main/java/org/apache/hudi/storage/StoragePathInfo.java
@@ -24,6 +24,7 @@ import org.apache.hudi.PublicAPIClass;
import org.apache.hudi.PublicAPIMethod;
import java.io.Serializable;
+import java.util.Arrays;
/**
* Represents the information of a storage path representing a directory or a
file.
@@ -38,6 +39,7 @@ public class StoragePathInfo implements Serializable,
Comparable<StoragePathInfo
private final short blockReplication;
private final long blockSize;
private final long modificationTime;
+ private final String[] locations;
public StoragePathInfo(StoragePath path,
long length,
@@ -45,12 +47,24 @@ public class StoragePathInfo implements Serializable,
Comparable<StoragePathInfo
short blockReplication,
long blockSize,
long modificationTime) {
+ this(path, length, isDirectory, blockReplication,
+ blockSize, modificationTime, null);
+ }
+
+ public StoragePathInfo(StoragePath path,
+ long length,
+ boolean isDirectory,
+ short blockReplication,
+ long blockSize,
+ long modificationTime,
+ String[] locations) {
this.path = path;
this.length = length;
this.isDirectory = isDirectory;
this.blockReplication = blockReplication;
this.blockSize = blockSize;
this.modificationTime = modificationTime;
+ this.locations = locations;
}
/**
@@ -109,6 +123,14 @@ public class StoragePathInfo implements Serializable,
Comparable<StoragePathInfo
return modificationTime;
}
+ /**
+ * @return the locations of the file in the file system, possibly null.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public String[] getLocations() {
+ return locations;
+ }
+
@Override
public int compareTo(StoragePathInfo o) {
return this.getPath().compareTo(o.getPath());
@@ -144,6 +166,7 @@ public class StoragePathInfo implements Serializable,
Comparable<StoragePathInfo
+ ", blockReplication=" + blockReplication
+ ", blockSize=" + blockSize
+ ", modificationTime=" + modificationTime
+ + ", locations=" + Arrays.toString(locations)
+ '}';
}
}