alexeykudinkin commented on code in PR #6680:
URL: https://github.com/apache/hudi/pull/6680#discussion_r979076389
##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -180,15 +185,118 @@ public void close() throws Exception {
}
protected List<PartitionPath> getAllQueryPartitionPaths() {
+ if (this.cachedAllPartitionPaths != null) {
+ return this.cachedAllPartitionPaths;
+ }
+
List<String> queryRelativePartitionPaths = queryPaths.stream()
.map(path -> FSUtils.getRelativePartitionPath(basePath, path))
.collect(Collectors.toList());
- // Load all the partition path from the basePath, and filter by the query
partition path.
- // TODO load files from the queryRelativePartitionPaths directly.
- List<String> matchedPartitionPaths = getAllPartitionPathsUnchecked()
- .stream()
- .filter(path ->
queryRelativePartitionPaths.stream().anyMatch(path::startsWith))
+ this.cachedAllPartitionPaths =
getQueryPartitionPaths(queryRelativePartitionPaths);
+
+ // If the partition value contains InternalRow.empty, we query it as a
non-partitioned table.
+ this.queryAsNonePartitionedTable =
this.cachedAllPartitionPaths.stream().anyMatch(p -> p.values.length == 0);
+ return this.cachedAllPartitionPaths;
+ }
+
+ protected Map<PartitionPath, List<FileSlice>> getAllInputFileSlices() {
+ if (!isAllInputFileSlicesCached) {
+ doRefresh();
+ }
+ return cachedAllInputFileSlices;
+ }
+
+ /**
+ * Get input file slice for the given partition. Will use cache directly if
it is computed before.
+ */
+ protected List<FileSlice> getCachedInputFileSlices(PartitionPath partition) {
+ return cachedAllInputFileSlices.computeIfAbsent(partition, (p) -> {
+ FileStatus[] files = loadPartitionPathFiles(p);
+ HoodieTimeline activeTimeline = getActiveTimeline();
+ Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
+
+ HoodieTableFileSystemView fileSystemView = new
HoodieTableFileSystemView(metaClient, activeTimeline, files);
+
+ Option<String> queryInstant = specifiedQueryInstant.or(() ->
latestInstant.map(HoodieInstant::getTimestamp));
+
+ validate(activeTimeline, queryInstant);
+
+ List<FileSlice> ret;
+ if (tableType.equals(HoodieTableType.MERGE_ON_READ) &&
queryType.equals(HoodieTableQueryType.SNAPSHOT)) {
+ ret = queryInstant.map(instant ->
+ fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p.path,
queryInstant.get())
+ .collect(Collectors.toList())
+ )
+ .orElse(Collections.emptyList());
+ } else {
+ ret = queryInstant.map(instant ->
+ fileSystemView.getLatestFileSlicesBeforeOrOn(p.path, instant,
true)
+ )
+ .orElse(fileSystemView.getLatestFileSlices(p.path))
+ .collect(Collectors.toList());
+ }
+
+ cachedFileSize +=
ret.stream().mapToLong(BaseHoodieTableFileIndex::fileSliceSize).sum();
+ return ret;
+ });
+ }
+
+ /**
+ * Get partition path with the given partition value
+ * @param partitionNames partition names
+ * @param values partition values
+ * @return partitions that match the given partition values
+ */
+ protected List<PartitionPath> getPartitionPaths(String[] partitionNames,
String[] values) {
+ if (partitionNames.length == 0 || partitionNames.length != values.length) {
+ LOG.info("The input partition names or value is empty, fallback to
return all partition paths");
+ return getAllQueryPartitionPaths();
+ }
+
+ if (cachedAllPartitionPaths != null) {
+ LOG.info("All partition paths have already loaded, use it directly");
+ return cachedAllPartitionPaths;
+ }
+
+ boolean hiveStylePartitioning =
Boolean.parseBoolean(metaClient.getTableConfig().getHiveStylePartitioningEnable());
+ boolean urlEncodePartitioning =
Boolean.parseBoolean(this.metaClient.getTableConfig().getUrlEncodePartitioning());
+ Map<String, Integer> partitionNamesToIdx = IntStream.range(0,
partitionNames.length)
+ .mapToObj(i -> Pair.of(i, partitionNames[i]))
+ .collect(Collectors.toMap(Pair::getValue, Pair::getKey));
+ StringBuilder queryPartitionPath = new StringBuilder();
+ int idx = 0;
+ for (; idx < partitionNames.length; ++idx) {
+ String columnNames = this.partitionColumns[idx];
+ if (partitionNamesToIdx.containsKey(columnNames)) {
+ int k = partitionNamesToIdx.get(columnNames);
+ String value = urlEncodePartitioning ?
PartitionPathEncodeUtils.escapePathName(values[k]) : values[k];
+ queryPartitionPath.append(hiveStylePartitioning ? columnNames + "=" :
"").append(value).append("/");
Review Comment:
If i understood your intention correctly, then there would be a following
problem: in the partition path ordering of the columns is fixed, therefore if
query specifies partition column predicates which only referenced columns that
do not constitute the prefix of the actual partition path, then this would not
work. For ex, let's say there are 2 partition columns A and B, and the path
looks like `A=foo/B=bar`, but then the query only specifies predicate
containing only B
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]