wombatu-kun commented on code in PR #18229:
URL: https://github.com/apache/hudi/pull/18229#discussion_r2840897721
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java:
##########
@@ -403,16 +405,45 @@ public LookupRuntimeProvider
getLookupRuntimeProvider(LookupContext context) {
Duration duration = conf.get(LOOKUP_JOIN_CACHE_TTL);
boolean asyncEnabled = conf.get(LOOKUP_ASYNC);
int asyncThreadNumber = conf.get(LOOKUP_ASYNC_THREAD_NUMBER);
+ Option<PartitionPruners.PartitionPruner> lookupPruner =
buildLookupPartitionPruner();
return LookupRuntimeProviderFactory.create(
new HoodieLookupFunction(
- new HoodieLookupTableReader(this::getBatchInputFormat, conf),
+ new HoodieLookupTableReader(
+ lookupPruner.isPresent()
+ ? () -> getBatchInputFormatWithPruner(lookupPruner.get())
+ : this::getBatchInputFormat,
+ conf),
(RowType) getProducedDataType().notNull().getLogicalType(),
getLookupKeys(context.getKeys()),
duration,
conf
), asyncEnabled, asyncThreadNumber);
}
+ private Option<PartitionPruners.PartitionPruner>
buildLookupPartitionPruner() {
+ if (!conf.contains(LOOKUP_PARTITIONS) || !isPartitioned()) {
+ return Option.empty();
+ }
+ List<String> partitionPaths =
PartitionPathParser.parseLookupPartitionPaths(
+ conf.get(LOOKUP_PARTITIONS), this.partitionKeys,
conf.get(FlinkOptions.HIVE_STYLE_PARTITIONING));
+ if (partitionPaths.isEmpty()) {
+ return Option.empty();
+ }
+ return Option.of(PartitionPruners.builder()
+ .candidatePartitions(partitionPaths)
+ .build());
+ }
+
+ private InputFormat<RowData, ?>
getBatchInputFormatWithPruner(PartitionPruners.PartitionPruner lookupPruner) {
+ PartitionPruners.PartitionPruner saved = this.partitionPruner;
+ this.partitionPruner = lookupPruner;
+ try {
+ return getBatchInputFormat();
+ } finally {
+ this.partitionPruner = saved;
Review Comment:
@danny0405 Ok, i've refactored this code (in separate commit). Agree with
you about error-prone, but it was not enough to just pass around
PartitionPruner into getBatchInputFormat, because it is used deeper (in
`FileIndexReader`). I think, now it becomes not less error-prone, but more
complicated.
Review it please, if it's ok for you, approve it, then I'll actualize PR
description and merge.
If during review you decide that the first version was better - let me know,
i'll drop the second commit and re-request review.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]