alexeykudinkin commented on code in PR #6680:
URL: https://github.com/apache/hudi/pull/6680#discussion_r980388571
##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -139,7 +144,11 @@ public BaseHoodieTableFileIndex(HoodieEngineContext
engineContext,
this.engineContext = engineContext;
this.fileStatusCache = fileStatusCache;
- doRefresh();
+ if (shouldRefresh) {
Review Comment:
Let's add a comment explaining the difference b/w these branches
(elaborating on lazy-semantic)
##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -180,15 +189,125 @@ public void close() throws Exception {
}
protected List<PartitionPath> getAllQueryPartitionPaths() {
+ if (this.cachedAllPartitionPaths != null) {
Review Comment:
nit: We can avoid prefixing `this.` to fields when accessing; this is
usually reserved to assign fields w/in ctors when there's local var w/ the same
name is present in the scope
##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -180,15 +189,125 @@ public void close() throws Exception {
}
protected List<PartitionPath> getAllQueryPartitionPaths() {
+ if (this.cachedAllPartitionPaths != null) {
+ return this.cachedAllPartitionPaths;
+ }
+
+ loadAllQueryPartitionPaths();
+ return this.cachedAllPartitionPaths;
+ }
+
+ private void loadAllQueryPartitionPaths() {
List<String> queryRelativePartitionPaths = queryPaths.stream()
.map(path -> FSUtils.getRelativePartitionPath(basePath, path))
.collect(Collectors.toList());
- // Load all the partition path from the basePath, and filter by the query
partition path.
- // TODO load files from the queryRelativePartitionPaths directly.
- List<String> matchedPartitionPaths = getAllPartitionPathsUnchecked()
- .stream()
- .filter(path ->
queryRelativePartitionPaths.stream().anyMatch(path::startsWith))
+ this.cachedAllPartitionPaths =
getQueryPartitionPaths(queryRelativePartitionPaths);
+
+ // If the partition value contains InternalRow.empty, we query it as a
non-partitioned table.
+ this.queryAsNonePartitionedTable =
this.cachedAllPartitionPaths.stream().anyMatch(p -> p.values.length == 0);
+ }
+
+ protected Map<PartitionPath, List<FileSlice>> getAllInputFileSlices() {
+ if (!isAllInputFileSlicesCached) {
+ doRefresh();
+ }
+ return cachedAllInputFileSlices;
+ }
+
+ /**
+ * Get input file slice for the given partition. Will use cache directly if
it is computed before.
+ */
+ protected List<FileSlice> getCachedInputFileSlices(PartitionPath partition) {
+ return cachedAllInputFileSlices.computeIfAbsent(partition,
this::loadInputFileSlicesOfPartition);
+ }
+
+ private List<FileSlice> loadInputFileSlicesOfPartition(PartitionPath p) {
+ FileStatus[] files = loadPartitionPathFiles(p);
+ HoodieTimeline activeTimeline = getActiveTimeline();
+ Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
+
+ HoodieTableFileSystemView fileSystemView = new
HoodieTableFileSystemView(metaClient, activeTimeline, files);
+
+ Option<String> queryInstant = specifiedQueryInstant.or(() ->
latestInstant.map(HoodieInstant::getTimestamp));
+
+ validate(activeTimeline, queryInstant);
+
+ List<FileSlice> ret;
+ if (tableType.equals(HoodieTableType.MERGE_ON_READ) &&
queryType.equals(HoodieTableQueryType.SNAPSHOT)) {
+ ret = queryInstant.map(instant ->
+ fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p.path,
queryInstant.get())
+ .collect(Collectors.toList())
+ )
+ .orElse(Collections.emptyList());
+ } else {
+ ret = queryInstant.map(instant ->
+ fileSystemView.getLatestFileSlicesBeforeOrOn(p.path, instant,
true)
+ )
+ .orElse(fileSystemView.getLatestFileSlices(p.path))
+ .collect(Collectors.toList());
+ }
+
+ cachedFileSize +=
ret.stream().mapToLong(BaseHoodieTableFileIndex::fileSliceSize).sum();
+ return ret;
+ }
+
+ /**
+ * Get partition path with the given partition value
+ * @param partitionNames partition names
+ * @param values partition values
+ * @return partitions that match the given partition values
+ */
+ protected List<PartitionPath> getPartitionPaths(String[] partitionNames,
String[] values) {
+ if (partitionNames.length == 0 || partitionNames.length != values.length) {
+ LOG.info("The input partition names or value is empty, fallback to
return all partition paths");
+ return getAllQueryPartitionPaths();
+ }
+
+ if (cachedAllPartitionPaths != null) {
+ LOG.info("All partition paths have already loaded, use it directly");
+ return cachedAllPartitionPaths;
+ }
+
+ boolean hiveStylePartitioning =
Boolean.parseBoolean(metaClient.getTableConfig().getHiveStylePartitioningEnable());
+ boolean urlEncodePartitioning =
Boolean.parseBoolean(this.metaClient.getTableConfig().getUrlEncodePartitioning());
+ Map<String, Integer> partitionNameToIdx = IntStream.range(0,
partitionNames.length)
+ .mapToObj(i -> Pair.of(i, partitionNames[i]))
+ .collect(Collectors.toMap(Pair::getValue, Pair::getKey));
+ StringBuilder queryPartitionPath = new StringBuilder();
+ int idx = 0;
+ for (; idx < partitionNames.length; ++idx) {
+ String columnNames = this.partitionColumns[idx];
+ if (partitionNameToIdx.containsKey(columnNames)) {
+ int k = partitionNameToIdx.get(columnNames);
+ String value = urlEncodePartitioning ?
PartitionPathEncodeUtils.escapePathName(values[k]) : values[k];
+ queryPartitionPath.append(hiveStylePartitioning ? columnNames + "=" :
"").append(value).append("/");
+ } else {
+ break;
+ }
+ }
+ queryPartitionPath.deleteCharAt(queryPartitionPath.length() - 1);
+ // Return directly if all partition values are specified.
+ if (idx == this.partitionColumns.length) {
+ return Collections.singletonList(new
PartitionPath(queryPartitionPath.toString(),
parsePartitionColumnValues(partitionColumns, queryPartitionPath.toString())));
+ }
+ // The predicate forms a prefix of partition path, do listing to the path
only.
+ return
getQueryPartitionPaths(Collections.singletonList(queryPartitionPath.toString()));
+ }
+
+ private List<PartitionPath> getQueryPartitionPaths(List<String>
queryRelativePartitionPaths) {
Review Comment:
nit: Let's rename this to `listQueryPartitionPath` to make it apparent that
this will entail MT/FS listing operation
##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -180,15 +189,125 @@ public void close() throws Exception {
}
protected List<PartitionPath> getAllQueryPartitionPaths() {
+ if (this.cachedAllPartitionPaths != null) {
+ return this.cachedAllPartitionPaths;
+ }
+
+ loadAllQueryPartitionPaths();
+ return this.cachedAllPartitionPaths;
+ }
+
+ private void loadAllQueryPartitionPaths() {
List<String> queryRelativePartitionPaths = queryPaths.stream()
.map(path -> FSUtils.getRelativePartitionPath(basePath, path))
.collect(Collectors.toList());
- // Load all the partition path from the basePath, and filter by the query
partition path.
- // TODO load files from the queryRelativePartitionPaths directly.
- List<String> matchedPartitionPaths = getAllPartitionPathsUnchecked()
- .stream()
- .filter(path ->
queryRelativePartitionPaths.stream().anyMatch(path::startsWith))
+ this.cachedAllPartitionPaths =
getQueryPartitionPaths(queryRelativePartitionPaths);
+
+ // If the partition value contains InternalRow.empty, we query it as a
non-partitioned table.
+ this.queryAsNonePartitionedTable =
this.cachedAllPartitionPaths.stream().anyMatch(p -> p.values.length == 0);
+ }
+
+ protected Map<PartitionPath, List<FileSlice>> getAllInputFileSlices() {
+ if (!isAllInputFileSlicesCached) {
+ doRefresh();
+ }
+ return cachedAllInputFileSlices;
+ }
+
+ /**
+ * Get input file slice for the given partition. Will use cache directly if
it is computed before.
+ */
+ protected List<FileSlice> getCachedInputFileSlices(PartitionPath partition) {
+ return cachedAllInputFileSlices.computeIfAbsent(partition,
this::loadInputFileSlicesOfPartition);
+ }
+
+ private List<FileSlice> loadInputFileSlicesOfPartition(PartitionPath p) {
+ FileStatus[] files = loadPartitionPathFiles(p);
+ HoodieTimeline activeTimeline = getActiveTimeline();
+ Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
+
+ HoodieTableFileSystemView fileSystemView = new
HoodieTableFileSystemView(metaClient, activeTimeline, files);
+
+ Option<String> queryInstant = specifiedQueryInstant.or(() ->
latestInstant.map(HoodieInstant::getTimestamp));
Review Comment:
Let's create a `Lazy` field for this one (this is not supposed to change for
given FileIndex instance)
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -165,7 +164,7 @@ case class HoodieFileIndex(spark: SparkSession,
logInfo(s"Total base files: $totalFileSize; " +
s"candidate files after data skipping : $candidateFileSize; " +
- s"skipping percent ${if (allFiles.nonEmpty && totalFileSize > 0)
(totalFileSize - candidateFileSize) / totalFileSize.toDouble else 0}")
+ s"skipping percent ${if (isAllInputFileSlicesCached &&
allFiles.nonEmpty && totalFileSize > 0) (totalFileSize - candidateFileSize) /
totalFileSize.toDouble else 0}")
Review Comment:
If we're loading lazily let's instead output `"is disabled"`
##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -180,15 +189,125 @@ public void close() throws Exception {
}
protected List<PartitionPath> getAllQueryPartitionPaths() {
+ if (this.cachedAllPartitionPaths != null) {
+ return this.cachedAllPartitionPaths;
+ }
+
+ loadAllQueryPartitionPaths();
+ return this.cachedAllPartitionPaths;
+ }
+
+ private void loadAllQueryPartitionPaths() {
List<String> queryRelativePartitionPaths = queryPaths.stream()
.map(path -> FSUtils.getRelativePartitionPath(basePath, path))
.collect(Collectors.toList());
- // Load all the partition path from the basePath, and filter by the query
partition path.
- // TODO load files from the queryRelativePartitionPaths directly.
- List<String> matchedPartitionPaths = getAllPartitionPathsUnchecked()
- .stream()
- .filter(path ->
queryRelativePartitionPaths.stream().anyMatch(path::startsWith))
+ this.cachedAllPartitionPaths =
getQueryPartitionPaths(queryRelativePartitionPaths);
+
+ // If the partition value contains InternalRow.empty, we query it as a
non-partitioned table.
+ this.queryAsNonePartitionedTable =
this.cachedAllPartitionPaths.stream().anyMatch(p -> p.values.length == 0);
+ }
+
+ protected Map<PartitionPath, List<FileSlice>> getAllInputFileSlices() {
+ if (!isAllInputFileSlicesCached) {
+ doRefresh();
+ }
+ return cachedAllInputFileSlices;
+ }
+
+ /**
+ * Get input file slice for the given partition. Will use cache directly if
it is computed before.
+ */
+ protected List<FileSlice> getCachedInputFileSlices(PartitionPath partition) {
+ return cachedAllInputFileSlices.computeIfAbsent(partition,
this::loadInputFileSlicesOfPartition);
+ }
+
+ private List<FileSlice> loadInputFileSlicesOfPartition(PartitionPath p) {
Review Comment:
nit: `loadFileSlicesForPartition`
##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -180,15 +189,125 @@ public void close() throws Exception {
}
protected List<PartitionPath> getAllQueryPartitionPaths() {
+ if (this.cachedAllPartitionPaths != null) {
+ return this.cachedAllPartitionPaths;
+ }
+
+ loadAllQueryPartitionPaths();
+ return this.cachedAllPartitionPaths;
+ }
+
+ private void loadAllQueryPartitionPaths() {
List<String> queryRelativePartitionPaths = queryPaths.stream()
.map(path -> FSUtils.getRelativePartitionPath(basePath, path))
.collect(Collectors.toList());
- // Load all the partition path from the basePath, and filter by the query
partition path.
- // TODO load files from the queryRelativePartitionPaths directly.
- List<String> matchedPartitionPaths = getAllPartitionPathsUnchecked()
- .stream()
- .filter(path ->
queryRelativePartitionPaths.stream().anyMatch(path::startsWith))
+ this.cachedAllPartitionPaths =
getQueryPartitionPaths(queryRelativePartitionPaths);
+
+ // If the partition value contains InternalRow.empty, we query it as a
non-partitioned table.
+ this.queryAsNonePartitionedTable =
this.cachedAllPartitionPaths.stream().anyMatch(p -> p.values.length == 0);
+ }
+
+ protected Map<PartitionPath, List<FileSlice>> getAllInputFileSlices() {
+ if (!isAllInputFileSlicesCached) {
+ doRefresh();
+ }
+ return cachedAllInputFileSlices;
+ }
+
+ /**
+ * Get input file slice for the given partition. Will use cache directly if
it is computed before.
+ */
+ protected List<FileSlice> getCachedInputFileSlices(PartitionPath partition) {
+ return cachedAllInputFileSlices.computeIfAbsent(partition,
this::loadInputFileSlicesOfPartition);
+ }
+
+ private List<FileSlice> loadInputFileSlicesOfPartition(PartitionPath p) {
+ FileStatus[] files = loadPartitionPathFiles(p);
+ HoodieTimeline activeTimeline = getActiveTimeline();
+ Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
+
+ HoodieTableFileSystemView fileSystemView = new
HoodieTableFileSystemView(metaClient, activeTimeline, files);
+
+ Option<String> queryInstant = specifiedQueryInstant.or(() ->
latestInstant.map(HoodieInstant::getTimestamp));
+
+ validate(activeTimeline, queryInstant);
+
+ List<FileSlice> ret;
+ if (tableType.equals(HoodieTableType.MERGE_ON_READ) &&
queryType.equals(HoodieTableQueryType.SNAPSHOT)) {
+ ret = queryInstant.map(instant ->
+ fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p.path,
queryInstant.get())
+ .collect(Collectors.toList())
+ )
+ .orElse(Collections.emptyList());
+ } else {
+ ret = queryInstant.map(instant ->
+ fileSystemView.getLatestFileSlicesBeforeOrOn(p.path, instant,
true)
+ )
+ .orElse(fileSystemView.getLatestFileSlices(p.path))
+ .collect(Collectors.toList());
+ }
+
+ cachedFileSize +=
ret.stream().mapToLong(BaseHoodieTableFileIndex::fileSliceSize).sum();
+ return ret;
+ }
+
+ /**
+ * Get partition path with the given partition value
+ * @param partitionNames partition names
+ * @param values partition values
+ * @return partitions that match the given partition values
+ */
+ protected List<PartitionPath> getPartitionPaths(String[] partitionNames,
String[] values) {
+ if (partitionNames.length == 0 || partitionNames.length != values.length) {
+ LOG.info("The input partition names or value is empty, fallback to
return all partition paths");
+ return getAllQueryPartitionPaths();
+ }
+
+ if (cachedAllPartitionPaths != null) {
+ LOG.info("All partition paths have already loaded, use it directly");
+ return cachedAllPartitionPaths;
+ }
+
+ boolean hiveStylePartitioning =
Boolean.parseBoolean(metaClient.getTableConfig().getHiveStylePartitioningEnable());
+ boolean urlEncodePartitioning =
Boolean.parseBoolean(this.metaClient.getTableConfig().getUrlEncodePartitioning());
Review Comment:
Let's add a tracking task to consolidate this logic w/in key-gens
##########
hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java:
##########
@@ -74,8 +75,13 @@ public List<String> getAllPartitionPaths() throws
IOException {
return FSUtils.getAllPartitionFoldersThreeLevelsDown(fs,
datasetBasePath);
}
+ return getPartitionPathsWithPrefix("");
+ }
+
+ @Override
+ public List<String> getPartitionPathsWithPrefix(String prefix) throws
IOException {
List<Path> pathsToList = new CopyOnWriteArrayList<>();
- pathsToList.add(basePath);
+ pathsToList.add(StringUtils.isNullOrEmpty(prefix) ? new
Path(datasetBasePath) : new Path(datasetBasePath, prefix));
Review Comment:
This implementation currently assumes that either a) prefix is "" or b)
prefix is an actual relative path (instead of, say, prefix of the name of the
folder) which could be misleading
Instead, i'd suggest we do the following:
- Let's create a separate method
`getPartitionPathsWithPrefix(Option<String> prefix)`
- Not allow empty prefixes to be passed in
- Make sure that implementation can handle actual prefixes (instead of
currently digesting only the full-paths)
##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -180,15 +185,118 @@ public void close() throws Exception {
}
protected List<PartitionPath> getAllQueryPartitionPaths() {
+ if (this.cachedAllPartitionPaths != null) {
+ return this.cachedAllPartitionPaths;
+ }
+
List<String> queryRelativePartitionPaths = queryPaths.stream()
.map(path -> FSUtils.getRelativePartitionPath(basePath, path))
.collect(Collectors.toList());
- // Load all the partition path from the basePath, and filter by the query
partition path.
- // TODO load files from the queryRelativePartitionPaths directly.
- List<String> matchedPartitionPaths = getAllPartitionPathsUnchecked()
- .stream()
- .filter(path ->
queryRelativePartitionPaths.stream().anyMatch(path::startsWith))
+ this.cachedAllPartitionPaths =
getQueryPartitionPaths(queryRelativePartitionPaths);
+
+ // If the partition value contains InternalRow.empty, we query it as a
non-partitioned table.
+ this.queryAsNonePartitionedTable =
this.cachedAllPartitionPaths.stream().anyMatch(p -> p.values.length == 0);
+ return this.cachedAllPartitionPaths;
+ }
+
+ protected Map<PartitionPath, List<FileSlice>> getAllInputFileSlices() {
+ if (!isAllInputFileSlicesCached) {
Review Comment:
Let's also add a comment explaining why we're doing full refresh here in
that case
##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -180,15 +185,118 @@ public void close() throws Exception {
}
protected List<PartitionPath> getAllQueryPartitionPaths() {
+ if (this.cachedAllPartitionPaths != null) {
+ return this.cachedAllPartitionPaths;
+ }
+
List<String> queryRelativePartitionPaths = queryPaths.stream()
.map(path -> FSUtils.getRelativePartitionPath(basePath, path))
.collect(Collectors.toList());
- // Load all the partition path from the basePath, and filter by the query
partition path.
- // TODO load files from the queryRelativePartitionPaths directly.
- List<String> matchedPartitionPaths = getAllPartitionPathsUnchecked()
- .stream()
- .filter(path ->
queryRelativePartitionPaths.stream().anyMatch(path::startsWith))
+ this.cachedAllPartitionPaths =
getQueryPartitionPaths(queryRelativePartitionPaths);
+
+ // If the partition value contains InternalRow.empty, we query it as a
non-partitioned table.
+ this.queryAsNonePartitionedTable =
this.cachedAllPartitionPaths.stream().anyMatch(p -> p.values.length == 0);
+ return this.cachedAllPartitionPaths;
+ }
+
+ protected Map<PartitionPath, List<FileSlice>> getAllInputFileSlices() {
+ if (!isAllInputFileSlicesCached) {
+ doRefresh();
+ }
+ return cachedAllInputFileSlices;
+ }
+
+ /**
+ * Get input file slice for the given partition. Will use cache directly if
it is computed before.
+ */
+ protected List<FileSlice> getCachedInputFileSlices(PartitionPath partition) {
+ return cachedAllInputFileSlices.computeIfAbsent(partition, (p) -> {
+ FileStatus[] files = loadPartitionPathFiles(p);
+ HoodieTimeline activeTimeline = getActiveTimeline();
+ Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
+
+ HoodieTableFileSystemView fileSystemView = new
HoodieTableFileSystemView(metaClient, activeTimeline, files);
+
+ Option<String> queryInstant = specifiedQueryInstant.or(() ->
latestInstant.map(HoodieInstant::getTimestamp));
+
+ validate(activeTimeline, queryInstant);
+
+ List<FileSlice> ret;
+ if (tableType.equals(HoodieTableType.MERGE_ON_READ) &&
queryType.equals(HoodieTableQueryType.SNAPSHOT)) {
+ ret = queryInstant.map(instant ->
+ fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p.path,
queryInstant.get())
+ .collect(Collectors.toList())
+ )
+ .orElse(Collections.emptyList());
+ } else {
+ ret = queryInstant.map(instant ->
+ fileSystemView.getLatestFileSlicesBeforeOrOn(p.path, instant,
true)
+ )
+ .orElse(fileSystemView.getLatestFileSlices(p.path))
+ .collect(Collectors.toList());
+ }
+
+ cachedFileSize +=
ret.stream().mapToLong(BaseHoodieTableFileIndex::fileSliceSize).sum();
+ return ret;
+ });
+ }
+
+ /**
+ * Get partition path with the given partition value
+ * @param partitionNames partition names
+ * @param values partition values
+ * @return partitions that match the given partition values
+ */
+ protected List<PartitionPath> getPartitionPaths(String[] partitionNames,
String[] values) {
+ if (partitionNames.length == 0 || partitionNames.length != values.length) {
+ LOG.info("The input partition names or value is empty, fallback to
return all partition paths");
+ return getAllQueryPartitionPaths();
+ }
+
+ if (cachedAllPartitionPaths != null) {
+ LOG.info("All partition paths have already loaded, use it directly");
+ return cachedAllPartitionPaths;
+ }
+
+ boolean hiveStylePartitioning =
Boolean.parseBoolean(metaClient.getTableConfig().getHiveStylePartitioningEnable());
+ boolean urlEncodePartitioning =
Boolean.parseBoolean(this.metaClient.getTableConfig().getUrlEncodePartitioning());
+ Map<String, Integer> partitionNamesToIdx = IntStream.range(0,
partitionNames.length)
+ .mapToObj(i -> Pair.of(i, partitionNames[i]))
+ .collect(Collectors.toMap(Pair::getValue, Pair::getKey));
+ StringBuilder queryPartitionPath = new StringBuilder();
+ int idx = 0;
+ for (; idx < partitionNames.length; ++idx) {
+ String columnNames = this.partitionColumns[idx];
+ if (partitionNamesToIdx.containsKey(columnNames)) {
+ int k = partitionNamesToIdx.get(columnNames);
+ String value = urlEncodePartitioning ?
PartitionPathEncodeUtils.escapePathName(values[k]) : values[k];
+ queryPartitionPath.append(hiveStylePartitioning ? columnNames + "=" :
"").append(value).append("/");
+ } else {
+ break;
+ }
+ }
+ queryPartitionPath.deleteCharAt(queryPartitionPath.length() - 1);
+ // Return directly if all partition values are specified.
+ if (idx == this.partitionColumns.length) {
Review Comment:
We don't need to use `idx` for it (we can use `partitionNames.length`)
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala:
##########
@@ -187,12 +189,16 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
* @param predicates The filter condition.
* @return The pruned partition paths.
*/
- protected def prunePartition(partitionPaths: Seq[PartitionPath], predicates:
Seq[Expression]): Seq[PartitionPath] = {
+ protected def prunePartition(predicates: Seq[Expression]):
Seq[PartitionPath] = {
Review Comment:
Let's rename this to `listMatchingPartitionPaths` (current name is
misaligned after signature change)
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala:
##########
@@ -209,7 +215,8 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
s" after partition prune size is: ${prunedPartitionPaths.size}")
prunedPartitionPaths
} else {
- partitionPaths
+ logInfo(s"No partition predicate provided, total partition size is:
${getAllQueryPartitionPaths.asScala.size}")
Review Comment:
Let's make sure we add comments elaborating on the lazy and non-lazy listing
flows
##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -323,30 +424,13 @@ private void doRefresh() {
.mapToLong(BaseHoodieTableFileIndex::fileSliceSize)
.sum();
- // If the partition value contains InternalRow.empty, we query it as a
non-partitioned table.
- queryAsNonePartitionedTable = partitionFiles.keySet().stream().anyMatch(p
-> p.values.length == 0);
Review Comment:
Let's actually replace this flag w/ `isPartitionedTable` predicate (sorry,
it's my bad that i missed it cleaning it up last time)
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##########
@@ -138,6 +138,14 @@ object DataSourceReadOptions {
" read from the data file (in Hudi partition columns are persisted by
default)." +
" This config is a fallback allowing to preserve existing behavior,
and should not be used otherwise.")
+ val REFRESH_PARTITION_AND_FILES_IN_INITIALIZATION: ConfigProperty[Boolean] =
Review Comment:
I don't think we need a config for that, instead:
- Lazy loading should be default behavior
- We should add a config that can override default behavior and always load
eagerly (fallback)
##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -180,15 +185,118 @@ public void close() throws Exception {
}
protected List<PartitionPath> getAllQueryPartitionPaths() {
+ if (this.cachedAllPartitionPaths != null) {
+ return this.cachedAllPartitionPaths;
+ }
+
List<String> queryRelativePartitionPaths = queryPaths.stream()
.map(path -> FSUtils.getRelativePartitionPath(basePath, path))
.collect(Collectors.toList());
- // Load all the partition path from the basePath, and filter by the query
partition path.
- // TODO load files from the queryRelativePartitionPaths directly.
- List<String> matchedPartitionPaths = getAllPartitionPathsUnchecked()
- .stream()
- .filter(path ->
queryRelativePartitionPaths.stream().anyMatch(path::startsWith))
+ this.cachedAllPartitionPaths =
getQueryPartitionPaths(queryRelativePartitionPaths);
+
+ // If the partition value contains InternalRow.empty, we query it as a
non-partitioned table.
+ this.queryAsNonePartitionedTable =
this.cachedAllPartitionPaths.stream().anyMatch(p -> p.values.length == 0);
+ return this.cachedAllPartitionPaths;
+ }
+
+ protected Map<PartitionPath, List<FileSlice>> getAllInputFileSlices() {
+ if (!isAllInputFileSlicesCached) {
Review Comment:
Understood. Let's rename this flag to actually associate it w/ lazy-loading
semantic: `shouldRefreshLazily`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]