danny0405 commented on code in PR #18229:
URL: https://github.com/apache/hudi/pull/18229#discussion_r2850644442
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java:
##########
@@ -403,16 +405,45 @@ public LookupRuntimeProvider
getLookupRuntimeProvider(LookupContext context) {
Duration duration = conf.get(LOOKUP_JOIN_CACHE_TTL);
boolean asyncEnabled = conf.get(LOOKUP_ASYNC);
int asyncThreadNumber = conf.get(LOOKUP_ASYNC_THREAD_NUMBER);
+ Option<PartitionPruners.PartitionPruner> lookupPruner =
buildLookupPartitionPruner();
return LookupRuntimeProviderFactory.create(
new HoodieLookupFunction(
- new HoodieLookupTableReader(this::getBatchInputFormat, conf),
+ new HoodieLookupTableReader(
+ lookupPruner.isPresent()
+ ? () -> getBatchInputFormatWithPruner(lookupPruner.get())
+ : this::getBatchInputFormat,
+ conf),
(RowType) getProducedDataType().notNull().getLogicalType(),
getLookupKeys(context.getKeys()),
duration,
conf
), asyncEnabled, asyncThreadNumber);
}
+ private Option<PartitionPruners.PartitionPruner>
buildLookupPartitionPruner() {
+ if (!conf.contains(LOOKUP_PARTITIONS) || !isPartitioned()) {
+ return Option.empty();
+ }
+ List<String> partitionPaths =
PartitionPathParser.parseLookupPartitionPaths(
+ conf.get(LOOKUP_PARTITIONS), this.partitionKeys,
conf.get(FlinkOptions.HIVE_STYLE_PARTITIONING));
+ if (partitionPaths.isEmpty()) {
+ return Option.empty();
+ }
+ return Option.of(PartitionPruners.builder()
+ .candidatePartitions(partitionPaths)
+ .build());
+ }
+
+ private InputFormat<RowData, ?>
getBatchInputFormatWithPruner(PartitionPruners.PartitionPruner lookupPruner) {
+ PartitionPruners.PartitionPruner saved = this.partitionPruner;
+ this.partitionPruner = lookupPruner;
+ try {
+ return getBatchInputFormat();
+ } finally {
+ this.partitionPruner = saved;
Review Comment:
you are right, the original code looks simpler, we can revert to the old
impl as long as we can ensure the file index is not reused for other scan
sources. maybe we just nullify the file index manually here to make it safe.
And if we can use SQL filter push down to specifiy the partitions of dim
table, there is no need to make any changes to the `HoodieTableSource`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]