yihua commented on code in PR #18136:
URL: https://github.com/apache/hudi/pull/18136#discussion_r2785120658
##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -267,14 +275,46 @@ private Map<PartitionPath, List<FileSlice>>
loadFileSlicesForPartitions(List<Par
validateTimestampAsOf(metaClient, specifiedQueryInstant.get());
}
- List<StoragePathInfo> allFiles = listPartitionPathFiles(partitions);
HoodieTimeline activeTimeline = getActiveTimeline();
Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
+ Option<String> queryInstant = specifiedQueryInstant.or(() ->
latestInstant.map(HoodieInstant::requestedTime));
+ validate(activeTimeline, queryInstant);
- try (HoodieTableFileSystemView fileSystemView = new
HoodieTableFileSystemView(metaClient, activeTimeline, allFiles)) {
- Option<String> queryInstant = specifiedQueryInstant.or(() ->
latestInstant.map(HoodieInstant::requestedTime));
- validate(activeTimeline, queryInstant);
+ HoodieTimer timer = HoodieTimer.start();
+ List<StoragePathInfo> allFiles = listPartitionPathFiles(partitions,
activeTimeline);
+ log.info("On {} with query instant as {}, it took {}ms to list all files
{} Hudi partitions",
+ metaClient.getTableConfig().getTableName(), queryInstant.map(instant
-> instant).orElse("N/A"),
+ timer.endTimer(), partitions.size());
Review Comment:
Have you considered what happens with MOR tables here? The
`HoodieROTablePathFilter` only returns base files (it calls
`fsView.getLatestBaseFiles()`), so this path constructs FileSlices without log
files. The `!shouldIncludePendingCommits` guard doesn't prevent MOR tables from
reaching this code. It might be worth adding a table-type check (COW only) or
documenting this limitation.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala:
##########
@@ -439,6 +443,21 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
private def arePartitionPathsUrlEncoded: Boolean =
metaClient.getTableConfig.getUrlEncodePartitioning.toBoolean
+
+ override protected def getPartitionPathFilter(activeTimeline:
HoodieTimeline):
org.apache.hudi.common.util.Option[org.apache.hudi.storage.StoragePathFilter] =
{
+ if (useROPathFilterForListing && !shouldIncludePendingCommits) {
+ val conf =
HadoopFSUtils.getStorageConf(spark.sparkContext.hadoopConfiguration)
+ if (specifiedQueryInstant.isDefined) {
+ conf.set(HoodieCommonConfig.TIMESTAMP_AS_OF.key(),
specifiedQueryInstant.get)
+ }
Review Comment:
`HadoopFSUtils.getStorageConf(spark.sparkContext.hadoopConfiguration)` wraps
the shared Hadoop config without copying it. The subsequent
`conf.set(TIMESTAMP_AS_OF, ...)` would mutate the global Spark session config,
which could affect other queries in the same session. Could you use
`getStorageConfWithCopy` instead?
##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -267,14 +275,46 @@ private Map<PartitionPath, List<FileSlice>>
loadFileSlicesForPartitions(List<Par
validateTimestampAsOf(metaClient, specifiedQueryInstant.get());
}
- List<StoragePathInfo> allFiles = listPartitionPathFiles(partitions);
HoodieTimeline activeTimeline = getActiveTimeline();
Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
+ Option<String> queryInstant = specifiedQueryInstant.or(() ->
latestInstant.map(HoodieInstant::requestedTime));
+ validate(activeTimeline, queryInstant);
- try (HoodieTableFileSystemView fileSystemView = new
HoodieTableFileSystemView(metaClient, activeTimeline, allFiles)) {
- Option<String> queryInstant = specifiedQueryInstant.or(() ->
latestInstant.map(HoodieInstant::requestedTime));
- validate(activeTimeline, queryInstant);
+ HoodieTimer timer = HoodieTimer.start();
+ List<StoragePathInfo> allFiles = listPartitionPathFiles(partitions,
activeTimeline);
+ log.info("On {} with query instant as {}, it took {}ms to list all files
{} Hudi partitions",
+ metaClient.getTableConfig().getTableName(), queryInstant.map(instant
-> instant).orElse("N/A"),
+ timer.endTimer(), partitions.size());
+
+ if (useROPathFilterForListing && !shouldIncludePendingCommits) {
+ // Group files by partition path, then by file group ID
+ Map<String, PartitionPath> partitionsMap = new HashMap<>();
+ partitions.forEach(p -> partitionsMap.put(p.path, p));
+ Map<PartitionPath, List<FileSlice>> partitionToFileSlices = new
HashMap<>();
+
Review Comment:
The `partitionPathStr` here is the absolute path
(`pathInfo.getPath().getParent().toString()`), but `FileSlice` expects a
relative partition path. The existing code path via `HoodieTableFileSystemView`
always uses relative paths. This would cause mismatches downstream wherever
`FileSlice.getPartitionPath()` is used. Should this be `relPartitionPath`
instead?
##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -267,14 +275,46 @@ private Map<PartitionPath, List<FileSlice>>
loadFileSlicesForPartitions(List<Par
validateTimestampAsOf(metaClient, specifiedQueryInstant.get());
}
- List<StoragePathInfo> allFiles = listPartitionPathFiles(partitions);
HoodieTimeline activeTimeline = getActiveTimeline();
Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
+ Option<String> queryInstant = specifiedQueryInstant.or(() ->
latestInstant.map(HoodieInstant::requestedTime));
+ validate(activeTimeline, queryInstant);
- try (HoodieTableFileSystemView fileSystemView = new
HoodieTableFileSystemView(metaClient, activeTimeline, allFiles)) {
- Option<String> queryInstant = specifiedQueryInstant.or(() ->
latestInstant.map(HoodieInstant::requestedTime));
- validate(activeTimeline, queryInstant);
+ HoodieTimer timer = HoodieTimer.start();
+ List<StoragePathInfo> allFiles = listPartitionPathFiles(partitions,
activeTimeline);
+ log.info("On {} with query instant as {}, it took {}ms to list all files
{} Hudi partitions",
+ metaClient.getTableConfig().getTableName(), queryInstant.map(instant
-> instant).orElse("N/A"),
Review Comment:
nit: `queryInstant.map(instant -> instant).orElse("N/A")` — the
`.map(instant -> instant)` is a no-op. You can simplify to
`queryInstant.orElse("N/A")`.
##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -267,14 +275,46 @@ private Map<PartitionPath, List<FileSlice>>
loadFileSlicesForPartitions(List<Par
validateTimestampAsOf(metaClient, specifiedQueryInstant.get());
}
- List<StoragePathInfo> allFiles = listPartitionPathFiles(partitions);
HoodieTimeline activeTimeline = getActiveTimeline();
Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
+ Option<String> queryInstant = specifiedQueryInstant.or(() ->
latestInstant.map(HoodieInstant::requestedTime));
+ validate(activeTimeline, queryInstant);
- try (HoodieTableFileSystemView fileSystemView = new
HoodieTableFileSystemView(metaClient, activeTimeline, allFiles)) {
- Option<String> queryInstant = specifiedQueryInstant.or(() ->
latestInstant.map(HoodieInstant::requestedTime));
- validate(activeTimeline, queryInstant);
+ HoodieTimer timer = HoodieTimer.start();
+ List<StoragePathInfo> allFiles = listPartitionPathFiles(partitions,
activeTimeline);
+ log.info("On {} with query instant as {}, it took {}ms to list all files
{} Hudi partitions",
+ metaClient.getTableConfig().getTableName(), queryInstant.map(instant
-> instant).orElse("N/A"),
+ timer.endTimer(), partitions.size());
+
+ if (useROPathFilterForListing && !shouldIncludePendingCommits) {
+ // Group files by partition path, then by file group ID
+ Map<String, PartitionPath> partitionsMap = new HashMap<>();
+ partitions.forEach(p -> partitionsMap.put(p.path, p));
+ Map<PartitionPath, List<FileSlice>> partitionToFileSlices = new
HashMap<>();
+
+ for (StoragePathInfo pathInfo : allFiles) {
+ // Create FileSlice obj from StoragePathInfo.
+ String partitionPathStr = pathInfo.getPath().getParent().toString();
+ String relPartitionPath = FSUtils.getRelativePartitionPath(basePath,
pathInfo.getPath().getParent());
+ HoodieBaseFile baseFile = new HoodieBaseFile(pathInfo);
Review Comment:
If `relPartitionPath` doesn't exactly match a key in `partitionsMap`,
`partitionPathObj` will be null and the `computeIfAbsent` call below will throw
NPE. This could happen with path normalization differences (trailing slashes,
scheme differences). Could you add a null check or use
`getRelativePartitionPath` consistently with how `PartitionPath.path` was
originally set?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -509,6 +509,7 @@ object HoodieFileIndex extends Logging {
}
def getConfigProperties(spark: SparkSession, options: Map[String, String],
tableConfig: HoodieTableConfig): TypedProperties = {
+ logInfo("Options provided to the file index are " + options)
val sqlConf: SQLConf = spark.sessionState.conf
Review Comment:
nit: this logs all options at INFO level on every file index creation —
could be quite noisy in production. Would `logDebug` be more appropriate here?
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java:
##########
@@ -156,6 +157,12 @@ List<String>
getPartitionPathWithPathPrefixUsingFilterExpression(List<String> re
Map<String, List<StoragePathInfo>>
getAllFilesInPartitions(Collection<String> partitionPaths)
throws IOException;
+ default Map<String, List<StoragePathInfo>>
getAllFilesInPartitions(Collection<String> partitionPaths,
+
Option<StoragePathFilter> pathFilterOption)
+ throws IOException {
+ return getAllFilesInPartitions(partitionPaths);
+ }
Review Comment:
Make `Map<String, List<StoragePathInfo>>
getAllFilesInPartitions(Collection<String> partitionPaths)` to have default
implementation of `getAllFilesInPartitions(partitionPaths, Option.empty())` so
subclasses can avoid the repeating code? Then
`getAllFilesInPartitions(Collection<String> partitionPaths,
Option<StoragePathFilter> pathFilterOption)` becomes an abstract method.
##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java:
##########
@@ -99,25 +100,25 @@ public class HoodieROTablePathFilter implements
Configurable, PathFilter, Serial
private transient HoodieLocalEngineContext engineContext;
-
private transient HoodieStorage storage;
public HoodieROTablePathFilter() {
- this(new Configuration());
+ this(HadoopFSUtils.getStorageConf());
Review Comment:
`HoodieROTablePathFilter` and `BaseFileOnlyRelation` should no longer be
used based on the latest master; instead,
`HoodieCopyOnWriteSnapshotHadoopFsRelationFactory` is used.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java:
##########
@@ -146,6 +147,12 @@ public List<StoragePathInfo>
getAllFilesInPartition(StoragePath partitionPath) t
}
@Override
Review Comment:
It looks like the `@Override` annotation that belonged to
`getAllFilesInPartitions(Collection<String>)` has been absorbed by the new
method insertion. In the diff, the `@Override` on line 148 now applies to the
new two-arg overload, while the original single-arg method (which is the actual
interface abstract method) loses its `@Override`. Could you add `@Override`
back to the original method?
##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java:
##########
@@ -146,6 +147,12 @@ public List<StoragePathInfo>
getAllFilesInPartition(StoragePath partitionPath) t
}
@Override
+ public Map<String, List<StoragePathInfo>>
getAllFilesInPartitions(Collection<String> partitions,
+
Option<StoragePathFilter> unused)
+ throws IOException {
+ return getAllFilesInPartitions(partitions);
+ }
+
public Map<String, List<StoragePathInfo>>
getAllFilesInPartitions(Collection<String> partitions)
Review Comment:
nit: Let this call `getAllFilesInPartitions(partitions, Option.empty())` to
be easier to read?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -527,6 +528,21 @@ object HoodieFileIndex extends Logging {
properties.setProperty(DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key,
listingModeOverride)
}
+ var hoodieROTablePathFilterBasedFileListingEnabled =
getConfigValue(options, sqlConf,
+
DataSourceReadOptions.FILE_INDEX_LIST_FILE_STATUSES_USING_RO_PATH_FILTER.key,
null)
+ if (hoodieROTablePathFilterBasedFileListingEnabled != null) {
+
properties.setProperty(DataSourceReadOptions.FILE_INDEX_LIST_FILE_STATUSES_USING_RO_PATH_FILTER.key,
+ hoodieROTablePathFilterBasedFileListingEnabled)
+ } else {
+ // For 0.14 rollout we also allow passing in the HMS listing config via
Spark itself
+ hoodieROTablePathFilterBasedFileListingEnabled = getConfigValue(options,
sqlConf,
+ "spark." +
DataSourceReadOptions.FILE_INDEX_LIST_FILE_STATUSES_USING_RO_PATH_FILTER.key,
null)
+ if (hoodieROTablePathFilterBasedFileListingEnabled != null) {
+
properties.setProperty(DataSourceReadOptions.FILE_INDEX_LIST_FILE_STATUSES_USING_RO_PATH_FILTER.key,
Review Comment:
nit: the comment says "For 0.14 rollout" — looks like this was copied from
the HMS listing config block. This is a new 1.2.0 config, so the comment is
misleading.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]