alexeykudinkin commented on code in PR #6680:
URL: https://github.com/apache/hudi/pull/6680#discussion_r984047423
##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -179,15 +197,125 @@ public void close() throws Exception {
}
protected List<PartitionPath> getAllQueryPartitionPaths() {
+ if (cachedAllPartitionPaths != null) {
+ return cachedAllPartitionPaths;
+ }
+
+ loadAllQueryPartitionPaths();
+ return cachedAllPartitionPaths;
+ }
+
+ private void loadAllQueryPartitionPaths() {
List<String> queryRelativePartitionPaths = queryPaths.stream()
.map(path -> FSUtils.getRelativePartitionPath(basePath, path))
.collect(Collectors.toList());
- // Load all the partition path from the basePath, and filter by the query
partition path.
- // TODO load files from the queryRelativePartitionPaths directly.
- List<String> matchedPartitionPaths = getAllPartitionPathsUnchecked()
- .stream()
- .filter(path ->
queryRelativePartitionPaths.stream().anyMatch(path::startsWith))
+ this.cachedAllPartitionPaths =
listQueryPartitionPaths(queryRelativePartitionPaths);
+
+ // If the partition value contains InternalRow.empty, we query it as a
non-partitioned table.
+ this.queryAsNonePartitionedTable =
this.cachedAllPartitionPaths.stream().anyMatch(p -> p.values.length == 0);
Review Comment:
We don't need this field anymore we can use `isPartitionedTable` method
##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -179,15 +197,125 @@ public void close() throws Exception {
}
protected List<PartitionPath> getAllQueryPartitionPaths() {
+ if (cachedAllPartitionPaths != null) {
+ return cachedAllPartitionPaths;
+ }
+
+ loadAllQueryPartitionPaths();
+ return cachedAllPartitionPaths;
+ }
+
+ private void loadAllQueryPartitionPaths() {
List<String> queryRelativePartitionPaths = queryPaths.stream()
.map(path -> FSUtils.getRelativePartitionPath(basePath, path))
.collect(Collectors.toList());
- // Load all the partition path from the basePath, and filter by the query
partition path.
- // TODO load files from the queryRelativePartitionPaths directly.
- List<String> matchedPartitionPaths = getAllPartitionPathsUnchecked()
- .stream()
- .filter(path ->
queryRelativePartitionPaths.stream().anyMatch(path::startsWith))
+ this.cachedAllPartitionPaths =
listQueryPartitionPaths(queryRelativePartitionPaths);
+
+ // If the partition value contains InternalRow.empty, we query it as a
non-partitioned table.
+ this.queryAsNonePartitionedTable =
this.cachedAllPartitionPaths.stream().anyMatch(p -> p.values.length == 0);
+ }
+
+ protected Map<PartitionPath, List<FileSlice>> getAllInputFileSlices() {
+ if (!isAllInputFileSlicesCached) {
+ doRefresh();
+ }
+ return cachedAllInputFileSlices;
+ }
+
+ /**
+ * Get input file slice for the given partition. Will use cache directly if
it is computed before.
+ */
+ protected List<FileSlice> getCachedInputFileSlices(PartitionPath partition) {
+ return cachedAllInputFileSlices.computeIfAbsent(partition,
this::loadFileSlicesForPartition);
+ }
+
+ private List<FileSlice> loadFileSlicesForPartition(PartitionPath p) {
+ FileStatus[] files = loadPartitionPathFiles(p);
+ HoodieTimeline activeTimeline = getActiveTimeline();
+ Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
+
+ HoodieTableFileSystemView fileSystemView = new
HoodieTableFileSystemView(metaClient, activeTimeline, files);
+
+ Option<String> queryInstant = specifiedQueryInstant.or(() ->
latestInstant.map(HoodieInstant::getTimestamp));
+
+ validate(activeTimeline, queryInstant);
+
+ List<FileSlice> ret;
+ if (tableType.equals(HoodieTableType.MERGE_ON_READ) &&
queryType.equals(HoodieTableQueryType.SNAPSHOT)) {
+ ret = queryInstant.map(instant ->
+ fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p.path,
queryInstant.get())
+ .collect(Collectors.toList())
+ )
+ .orElse(Collections.emptyList());
+ } else {
+ ret = queryInstant.map(instant ->
+ fileSystemView.getLatestFileSlicesBeforeOrOn(p.path, instant,
true)
+ )
+ .orElse(fileSystemView.getLatestFileSlices(p.path))
+ .collect(Collectors.toList());
+ }
+
+ cachedFileSize +=
ret.stream().mapToLong(BaseHoodieTableFileIndex::fileSliceSize).sum();
+ return ret;
+ }
+
+ /**
+ * Get partition path with the given partition value
+ * @param partitionNames partition names
+ * @param values partition values
+ * @return partitions that match the given partition values
+ */
+ protected List<PartitionPath> getPartitionPaths(String[] partitionNames,
String[] values) {
+ if (partitionNames.length == 0 || partitionNames.length != values.length) {
Review Comment:
Let's actually extract composing of the relative partition path (from
values) into a standalone method. Then we can get eliminate this one and then
just do:
```
val relativePaths = composeRelativePartitionPaths(partitionCols,
partitionVals)
listQueryPartitionPaths(relativePaths)
```
##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -179,15 +197,125 @@ public void close() throws Exception {
}
protected List<PartitionPath> getAllQueryPartitionPaths() {
+ if (cachedAllPartitionPaths != null) {
+ return cachedAllPartitionPaths;
+ }
+
+ loadAllQueryPartitionPaths();
Review Comment:
Let's do following:
- Make `loadAllQueryPartitionPaths` a) static, b) accept list of paths to
list, c) return listed partition paths (this will be necessary in other place
as well)
- Reshape our conditional here like following:
```
if (cached == null) {
this.cached = load(...)
}
return cached;
```
##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -138,7 +143,20 @@ public BaseHoodieTableFileIndex(HoodieEngineContext
engineContext,
this.engineContext = engineContext;
this.fileStatusCache = fileStatusCache;
- doRefresh();
+ /**
+ * The `shouldRefresh` variable controls how we initialize the
TableFileIndex:
Review Comment:
Let's actually
- Call it `shouldListLazily`
- Rename `isAllInputFileSlicesCached` accordingly
##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -179,15 +197,125 @@ public void close() throws Exception {
}
protected List<PartitionPath> getAllQueryPartitionPaths() {
+ if (cachedAllPartitionPaths != null) {
+ return cachedAllPartitionPaths;
+ }
+
+ loadAllQueryPartitionPaths();
+ return cachedAllPartitionPaths;
+ }
+
+ private void loadAllQueryPartitionPaths() {
List<String> queryRelativePartitionPaths = queryPaths.stream()
.map(path -> FSUtils.getRelativePartitionPath(basePath, path))
.collect(Collectors.toList());
- // Load all the partition path from the basePath, and filter by the query
partition path.
- // TODO load files from the queryRelativePartitionPaths directly.
- List<String> matchedPartitionPaths = getAllPartitionPathsUnchecked()
- .stream()
- .filter(path ->
queryRelativePartitionPaths.stream().anyMatch(path::startsWith))
+ this.cachedAllPartitionPaths =
listQueryPartitionPaths(queryRelativePartitionPaths);
+
+ // If the partition value contains InternalRow.empty, we query it as a
non-partitioned table.
+ this.queryAsNonePartitionedTable =
this.cachedAllPartitionPaths.stream().anyMatch(p -> p.values.length == 0);
+ }
+
+ protected Map<PartitionPath, List<FileSlice>> getAllInputFileSlices() {
+ if (!isAllInputFileSlicesCached) {
Review Comment:
This will refresh the whole cache. We don't want that, instead we want to
load the partition paths we haven't already listed.
##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -179,15 +197,125 @@ public void close() throws Exception {
}
protected List<PartitionPath> getAllQueryPartitionPaths() {
+ if (cachedAllPartitionPaths != null) {
+ return cachedAllPartitionPaths;
+ }
+
+ loadAllQueryPartitionPaths();
+ return cachedAllPartitionPaths;
+ }
+
+ private void loadAllQueryPartitionPaths() {
List<String> queryRelativePartitionPaths = queryPaths.stream()
.map(path -> FSUtils.getRelativePartitionPath(basePath, path))
.collect(Collectors.toList());
- // Load all the partition path from the basePath, and filter by the query
partition path.
- // TODO load files from the queryRelativePartitionPaths directly.
- List<String> matchedPartitionPaths = getAllPartitionPathsUnchecked()
- .stream()
- .filter(path ->
queryRelativePartitionPaths.stream().anyMatch(path::startsWith))
+ this.cachedAllPartitionPaths =
listQueryPartitionPaths(queryRelativePartitionPaths);
+
+ // If the partition value contains InternalRow.empty, we query it as a
non-partitioned table.
+ this.queryAsNonePartitionedTable =
this.cachedAllPartitionPaths.stream().anyMatch(p -> p.values.length == 0);
+ }
+
+ protected Map<PartitionPath, List<FileSlice>> getAllInputFileSlices() {
+ if (!isAllInputFileSlicesCached) {
+ doRefresh();
+ }
+ return cachedAllInputFileSlices;
+ }
+
+ /**
+ * Get input file slice for the given partition. Will use cache directly if
it is computed before.
+ */
+ protected List<FileSlice> getCachedInputFileSlices(PartitionPath partition) {
+ return cachedAllInputFileSlices.computeIfAbsent(partition,
this::loadFileSlicesForPartition);
+ }
+
+ private List<FileSlice> loadFileSlicesForPartition(PartitionPath p) {
+ FileStatus[] files = loadPartitionPathFiles(p);
+ HoodieTimeline activeTimeline = getActiveTimeline();
+ Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
+
+ HoodieTableFileSystemView fileSystemView = new
HoodieTableFileSystemView(metaClient, activeTimeline, files);
+
+ Option<String> queryInstant = specifiedQueryInstant.or(() ->
latestInstant.map(HoodieInstant::getTimestamp));
+
+ validate(activeTimeline, queryInstant);
+
+ List<FileSlice> ret;
+ if (tableType.equals(HoodieTableType.MERGE_ON_READ) &&
queryType.equals(HoodieTableQueryType.SNAPSHOT)) {
+ ret = queryInstant.map(instant ->
+ fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p.path,
queryInstant.get())
+ .collect(Collectors.toList())
+ )
+ .orElse(Collections.emptyList());
+ } else {
+ ret = queryInstant.map(instant ->
+ fileSystemView.getLatestFileSlicesBeforeOrOn(p.path, instant,
true)
+ )
+ .orElse(fileSystemView.getLatestFileSlices(p.path))
+ .collect(Collectors.toList());
+ }
+
+ cachedFileSize +=
ret.stream().mapToLong(BaseHoodieTableFileIndex::fileSliceSize).sum();
+ return ret;
+ }
+
+ /**
+ * Get partition path with the given partition value
+ * @param partitionNames partition names
+ * @param values partition values
+ * @return partitions that match the given partition values
+ */
+ protected List<PartitionPath> getPartitionPaths(String[] partitionNames,
String[] values) {
+ if (partitionNames.length == 0 || partitionNames.length != values.length) {
+ LOG.info("The input partition names or value is empty, fallback to
return all partition paths");
+ return getAllQueryPartitionPaths();
+ }
+
+ if (cachedAllPartitionPaths != null) {
+ LOG.info("All partition paths have already loaded, use it directly");
+ return cachedAllPartitionPaths;
+ }
+
+ boolean hiveStylePartitioning =
Boolean.parseBoolean(metaClient.getTableConfig().getHiveStylePartitioningEnable());
+ boolean urlEncodePartitioning =
Boolean.parseBoolean(this.metaClient.getTableConfig().getUrlEncodePartitioning());
+ Map<String, Integer> partitionNameToIdx = IntStream.range(0,
partitionNames.length)
+ .mapToObj(i -> Pair.of(i, partitionNames[i]))
+ .collect(Collectors.toMap(Pair::getValue, Pair::getKey));
+ StringBuilder queryPartitionPath = new StringBuilder();
+ int idx = 0;
+ for (; idx < partitionNames.length; ++idx) {
+ String columnNames = this.partitionColumns[idx];
+ if (partitionNameToIdx.containsKey(columnNames)) {
+ int k = partitionNameToIdx.get(columnNames);
+ String value = urlEncodePartitioning ?
PartitionPathEncodeUtils.escapePathName(values[k]) : values[k];
+ queryPartitionPath.append(hiveStylePartitioning ? columnNames + "=" :
"").append(value).append("/");
+ } else {
+ break;
+ }
+ }
+ queryPartitionPath.deleteCharAt(queryPartitionPath.length() - 1);
+ // Return directly if all partition values are specified.
+ if (idx == this.partitionColumns.length) {
+ return Collections.singletonList(new
PartitionPath(queryPartitionPath.toString(),
parsePartitionColumnValues(partitionColumns, queryPartitionPath.toString())));
+ }
+ // The predicate forms a prefix of partition path, do listing to the path
only.
+ return
listQueryPartitionPaths(Collections.singletonList(queryPartitionPath.toString()));
+ }
+
+ private List<PartitionPath> listQueryPartitionPaths(List<String>
queryRelativePartitionPaths) {
Review Comment:
Let's rename this to be more generic `listPartitionPath` (it's listing any
paths not only query ones)
##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -180,15 +185,118 @@ public void close() throws Exception {
}
protected List<PartitionPath> getAllQueryPartitionPaths() {
+ if (this.cachedAllPartitionPaths != null) {
+ return this.cachedAllPartitionPaths;
+ }
+
List<String> queryRelativePartitionPaths = queryPaths.stream()
.map(path -> FSUtils.getRelativePartitionPath(basePath, path))
.collect(Collectors.toList());
- // Load all the partition path from the basePath, and filter by the query
partition path.
- // TODO load files from the queryRelativePartitionPaths directly.
- List<String> matchedPartitionPaths = getAllPartitionPathsUnchecked()
- .stream()
- .filter(path ->
queryRelativePartitionPaths.stream().anyMatch(path::startsWith))
+ this.cachedAllPartitionPaths =
getQueryPartitionPaths(queryRelativePartitionPaths);
+
+ // If the partition value contains InternalRow.empty, we query it as a
non-partitioned table.
+ this.queryAsNonePartitionedTable =
this.cachedAllPartitionPaths.stream().anyMatch(p -> p.values.length == 0);
+ return this.cachedAllPartitionPaths;
+ }
+
+ protected Map<PartitionPath, List<FileSlice>> getAllInputFileSlices() {
+ if (!isAllInputFileSlicesCached) {
+ doRefresh();
+ }
+ return cachedAllInputFileSlices;
+ }
+
+ /**
+ * Get input file slice for the given partition. Will use cache directly if
it is computed before.
+ */
+ protected List<FileSlice> getCachedInputFileSlices(PartitionPath partition) {
+ return cachedAllInputFileSlices.computeIfAbsent(partition, (p) -> {
+ FileStatus[] files = loadPartitionPathFiles(p);
+ HoodieTimeline activeTimeline = getActiveTimeline();
+ Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
+
+ HoodieTableFileSystemView fileSystemView = new
HoodieTableFileSystemView(metaClient, activeTimeline, files);
+
+ Option<String> queryInstant = specifiedQueryInstant.or(() ->
latestInstant.map(HoodieInstant::getTimestamp));
+
+ validate(activeTimeline, queryInstant);
+
+ List<FileSlice> ret;
+ if (tableType.equals(HoodieTableType.MERGE_ON_READ) &&
queryType.equals(HoodieTableQueryType.SNAPSHOT)) {
+ ret = queryInstant.map(instant ->
+ fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p.path,
queryInstant.get())
+ .collect(Collectors.toList())
+ )
+ .orElse(Collections.emptyList());
+ } else {
+ ret = queryInstant.map(instant ->
+ fileSystemView.getLatestFileSlicesBeforeOrOn(p.path, instant,
true)
+ )
+ .orElse(fileSystemView.getLatestFileSlices(p.path))
+ .collect(Collectors.toList());
+ }
+
+ cachedFileSize +=
ret.stream().mapToLong(BaseHoodieTableFileIndex::fileSliceSize).sum();
+ return ret;
+ });
+ }
+
+ /**
+ * Get partition path with the given partition value
+ * @param partitionNames partition names
+ * @param values partition values
+ * @return partitions that match the given partition values
+ */
+ protected List<PartitionPath> getPartitionPaths(String[] partitionNames,
String[] values) {
+ if (partitionNames.length == 0 || partitionNames.length != values.length) {
+ LOG.info("The input partition names or value is empty, fallback to
return all partition paths");
+ return getAllQueryPartitionPaths();
+ }
+
+ if (cachedAllPartitionPaths != null) {
+ LOG.info("All partition paths have already loaded, use it directly");
+ return cachedAllPartitionPaths;
+ }
+
+ boolean hiveStylePartitioning =
Boolean.parseBoolean(metaClient.getTableConfig().getHiveStylePartitioningEnable());
+ boolean urlEncodePartitioning =
Boolean.parseBoolean(this.metaClient.getTableConfig().getUrlEncodePartitioning());
+ Map<String, Integer> partitionNamesToIdx = IntStream.range(0,
partitionNames.length)
+ .mapToObj(i -> Pair.of(i, partitionNames[i]))
+ .collect(Collectors.toMap(Pair::getValue, Pair::getKey));
+ StringBuilder queryPartitionPath = new StringBuilder();
+ int idx = 0;
+ for (; idx < partitionNames.length; ++idx) {
+ String columnNames = this.partitionColumns[idx];
+ if (partitionNamesToIdx.containsKey(columnNames)) {
+ int k = partitionNamesToIdx.get(columnNames);
+ String value = urlEncodePartitioning ?
PartitionPathEncodeUtils.escapePathName(values[k]) : values[k];
+ queryPartitionPath.append(hiveStylePartitioning ? columnNames + "=" :
"").append(value).append("/");
+ } else {
+ break;
+ }
+ }
+ queryPartitionPath.deleteCharAt(queryPartitionPath.length() - 1);
+ // Return directly if all partition values are specified.
+ if (idx == this.partitionColumns.length) {
Review Comment:
De-duplication we should be perform in the method analyzing predicates
(which is a caller of this method).
It's not about a time complexity, it's more about the _code complexity_ --
making it invariant of the idx simplifies this code
##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -180,15 +189,125 @@ public void close() throws Exception {
}
protected List<PartitionPath> getAllQueryPartitionPaths() {
+ if (this.cachedAllPartitionPaths != null) {
+ return this.cachedAllPartitionPaths;
+ }
+
+ loadAllQueryPartitionPaths();
+ return this.cachedAllPartitionPaths;
+ }
+
+ private void loadAllQueryPartitionPaths() {
List<String> queryRelativePartitionPaths = queryPaths.stream()
.map(path -> FSUtils.getRelativePartitionPath(basePath, path))
.collect(Collectors.toList());
- // Load all the partition path from the basePath, and filter by the query
partition path.
- // TODO load files from the queryRelativePartitionPaths directly.
- List<String> matchedPartitionPaths = getAllPartitionPathsUnchecked()
- .stream()
- .filter(path ->
queryRelativePartitionPaths.stream().anyMatch(path::startsWith))
+ this.cachedAllPartitionPaths =
getQueryPartitionPaths(queryRelativePartitionPaths);
+
+ // If the partition value contains InternalRow.empty, we query it as a
non-partitioned table.
+ this.queryAsNonePartitionedTable =
this.cachedAllPartitionPaths.stream().anyMatch(p -> p.values.length == 0);
+ }
+
+ protected Map<PartitionPath, List<FileSlice>> getAllInputFileSlices() {
+ if (!isAllInputFileSlicesCached) {
+ doRefresh();
+ }
+ return cachedAllInputFileSlices;
+ }
+
+ /**
+ * Get input file slice for the given partition. Will use cache directly if
it is computed before.
+ */
+ protected List<FileSlice> getCachedInputFileSlices(PartitionPath partition) {
+ return cachedAllInputFileSlices.computeIfAbsent(partition,
this::loadInputFileSlicesOfPartition);
+ }
+
+ private List<FileSlice> loadInputFileSlicesOfPartition(PartitionPath p) {
+ FileStatus[] files = loadPartitionPathFiles(p);
+ HoodieTimeline activeTimeline = getActiveTimeline();
+ Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
+
+ HoodieTableFileSystemView fileSystemView = new
HoodieTableFileSystemView(metaClient, activeTimeline, files);
+
+ Option<String> queryInstant = specifiedQueryInstant.or(() ->
latestInstant.map(HoodieInstant::getTimestamp));
Review Comment:
There's already utility called `Lazy`
P.S. I actually realized we can't use Lazy here b/c this object needs to be
serializable (and Lazy could not be unfortunately)
##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -179,15 +197,125 @@ public void close() throws Exception {
}
protected List<PartitionPath> getAllQueryPartitionPaths() {
+ if (cachedAllPartitionPaths != null) {
+ return cachedAllPartitionPaths;
+ }
+
+ loadAllQueryPartitionPaths();
Review Comment:
Actually, reading on my comment here seems like we can actually inline
`loadAllQueryPartitionPaths` it altogether after we remove
`queryAsNonePartitionedTable`
##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -179,15 +197,125 @@ public void close() throws Exception {
}
protected List<PartitionPath> getAllQueryPartitionPaths() {
+ if (cachedAllPartitionPaths != null) {
+ return cachedAllPartitionPaths;
+ }
+
+ loadAllQueryPartitionPaths();
+ return cachedAllPartitionPaths;
+ }
+
+ private void loadAllQueryPartitionPaths() {
List<String> queryRelativePartitionPaths = queryPaths.stream()
.map(path -> FSUtils.getRelativePartitionPath(basePath, path))
.collect(Collectors.toList());
- // Load all the partition path from the basePath, and filter by the query
partition path.
- // TODO load files from the queryRelativePartitionPaths directly.
- List<String> matchedPartitionPaths = getAllPartitionPathsUnchecked()
- .stream()
- .filter(path ->
queryRelativePartitionPaths.stream().anyMatch(path::startsWith))
+ this.cachedAllPartitionPaths =
listQueryPartitionPaths(queryRelativePartitionPaths);
+
+ // If the partition value contains InternalRow.empty, we query it as a
non-partitioned table.
+ this.queryAsNonePartitionedTable =
this.cachedAllPartitionPaths.stream().anyMatch(p -> p.values.length == 0);
+ }
+
+ protected Map<PartitionPath, List<FileSlice>> getAllInputFileSlices() {
+ if (!isAllInputFileSlicesCached) {
Review Comment:
We already have that method actually (`getCachedInputFileSlices`) we just
need to generalize it to be able to batch-list partitions
##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -179,15 +197,125 @@ public void close() throws Exception {
}
protected List<PartitionPath> getAllQueryPartitionPaths() {
+ if (cachedAllPartitionPaths != null) {
+ return cachedAllPartitionPaths;
+ }
+
+ loadAllQueryPartitionPaths();
+ return cachedAllPartitionPaths;
+ }
+
+ private void loadAllQueryPartitionPaths() {
List<String> queryRelativePartitionPaths = queryPaths.stream()
.map(path -> FSUtils.getRelativePartitionPath(basePath, path))
.collect(Collectors.toList());
- // Load all the partition path from the basePath, and filter by the query
partition path.
- // TODO load files from the queryRelativePartitionPaths directly.
- List<String> matchedPartitionPaths = getAllPartitionPathsUnchecked()
- .stream()
- .filter(path ->
queryRelativePartitionPaths.stream().anyMatch(path::startsWith))
+ this.cachedAllPartitionPaths =
listQueryPartitionPaths(queryRelativePartitionPaths);
+
+ // If the partition value contains InternalRow.empty, we query it as a
non-partitioned table.
+ this.queryAsNonePartitionedTable =
this.cachedAllPartitionPaths.stream().anyMatch(p -> p.values.length == 0);
+ }
+
+ protected Map<PartitionPath, List<FileSlice>> getAllInputFileSlices() {
+ if (!isAllInputFileSlicesCached) {
+ doRefresh();
+ }
+ return cachedAllInputFileSlices;
+ }
+
+ /**
+ * Get input file slice for the given partition. Will use cache directly if
it is computed before.
+ */
+ protected List<FileSlice> getCachedInputFileSlices(PartitionPath partition) {
+ return cachedAllInputFileSlices.computeIfAbsent(partition,
this::loadFileSlicesForPartition);
+ }
+
+ private List<FileSlice> loadFileSlicesForPartition(PartitionPath p) {
+ FileStatus[] files = loadPartitionPathFiles(p);
+ HoodieTimeline activeTimeline = getActiveTimeline();
+ Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
+
+ HoodieTableFileSystemView fileSystemView = new
HoodieTableFileSystemView(metaClient, activeTimeline, files);
+
+ Option<String> queryInstant = specifiedQueryInstant.or(() ->
latestInstant.map(HoodieInstant::getTimestamp));
+
+ validate(activeTimeline, queryInstant);
+
+ List<FileSlice> ret;
+ if (tableType.equals(HoodieTableType.MERGE_ON_READ) &&
queryType.equals(HoodieTableQueryType.SNAPSHOT)) {
+ ret = queryInstant.map(instant ->
+ fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p.path,
queryInstant.get())
+ .collect(Collectors.toList())
+ )
+ .orElse(Collections.emptyList());
+ } else {
+ ret = queryInstant.map(instant ->
+ fileSystemView.getLatestFileSlicesBeforeOrOn(p.path, instant,
true)
+ )
+ .orElse(fileSystemView.getLatestFileSlices(p.path))
+ .collect(Collectors.toList());
+ }
+
+ cachedFileSize +=
ret.stream().mapToLong(BaseHoodieTableFileIndex::fileSliceSize).sum();
+ return ret;
+ }
+
+ /**
+ * Get partition path with the given partition value
+ * @param partitionNames partition names
+ * @param values partition values
+ * @return partitions that match the given partition values
+ */
+ protected List<PartitionPath> getPartitionPaths(String[] partitionNames,
String[] values) {
+ if (partitionNames.length == 0 || partitionNames.length != values.length) {
+ LOG.info("The input partition names or value is empty, fallback to
return all partition paths");
+ return getAllQueryPartitionPaths();
+ }
+
+ if (cachedAllPartitionPaths != null) {
+ LOG.info("All partition paths have already loaded, use it directly");
+ return cachedAllPartitionPaths;
+ }
+
+ boolean hiveStylePartitioning =
Boolean.parseBoolean(metaClient.getTableConfig().getHiveStylePartitioningEnable());
+ boolean urlEncodePartitioning =
Boolean.parseBoolean(this.metaClient.getTableConfig().getUrlEncodePartitioning());
+ Map<String, Integer> partitionNameToIdx = IntStream.range(0,
partitionNames.length)
+ .mapToObj(i -> Pair.of(i, partitionNames[i]))
+ .collect(Collectors.toMap(Pair::getValue, Pair::getKey));
+ StringBuilder queryPartitionPath = new StringBuilder();
+ int idx = 0;
+ for (; idx < partitionNames.length; ++idx) {
+ String columnNames = this.partitionColumns[idx];
+ if (partitionNameToIdx.containsKey(columnNames)) {
+ int k = partitionNameToIdx.get(columnNames);
+ String value = urlEncodePartitioning ?
PartitionPathEncodeUtils.escapePathName(values[k]) : values[k];
+ queryPartitionPath.append(hiveStylePartitioning ? columnNames + "=" :
"").append(value).append("/");
+ } else {
+ break;
+ }
+ }
+ queryPartitionPath.deleteCharAt(queryPartitionPath.length() - 1);
+ // Return directly if all partition values are specified.
+ if (idx == this.partitionColumns.length) {
+ return Collections.singletonList(new
PartitionPath(queryPartitionPath.toString(),
parsePartitionColumnValues(partitionColumns, queryPartitionPath.toString())));
+ }
+ // The predicate forms a prefix of partition path, do listing to the path
only.
+ return
listQueryPartitionPaths(Collections.singletonList(queryPartitionPath.toString()));
+ }
+
+ private List<PartitionPath> listQueryPartitionPaths(List<String>
queryRelativePartitionPaths) {
Review Comment:
We can also make it static
##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -323,30 +424,13 @@ private void doRefresh() {
.mapToLong(BaseHoodieTableFileIndex::fileSliceSize)
.sum();
- // If the partition value contains InternalRow.empty, we query it as a
non-partitioned table.
- queryAsNonePartitionedTable = partitionFiles.keySet().stream().anyMatch(p
-> p.values.length == 0);
Review Comment:
Correct
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]