wombatu-kun opened a new pull request, #18229: URL: https://github.com/apache/hudi/pull/18229
### Describe the issue this Pull Request addresses When using Hudi as a lookup join dimension table in Flink, the entire table is loaded into the cache on each reload. For large dimension tables that are partitioned by a slowly-changing attribute (e.g., date, region), this means reading and caching data from all partitions — including historical ones that are irrelevant to the running job. This can cause excessive memory pressure and slow cache warm-up times. This PR introduces a new option lookup.partitions that allows users to declare which partitions should be loaded into the lookup join cache, so that only the relevant slice of the dimension table is materialized in memory. ### Summary and Changelog Users can now limit the lookup join cache to a subset of partitions by specifying the `lookup.partitions` option via a query hint. This is analogous to the existing lookup.async option that controls the lookup execution mode. Partitions are expressed as comma-separated `key=value pairs` (one partition per entry), with multiple partitions separated by ;: hudi-flink-datasource/hudi-flink — FlinkOptions.java Added LOOKUP_PARTITIONS (lookup.partitions) config option of type String with no default value. hudi-flink-datasource/hudi-flink — HoodieTableSource.java Modified getLookupRuntimeProvider() to build an optional partition pruner from lookup.partitions and pass it to the HoodieLookupTableReader supplier when the option is set. Added buildLookupPartitionPruner(): reads lookup.partitions, delegates path parsing to PartitionPathParser, and wraps the result in a PartitionPruners.StaticPartitionPruner via the existing PartitionPruners.Builder API. Added getBatchInputFormatWithPruner(PartitionPruners.PartitionPruner): temporarily substitutes this.partitionPruner with the lookup-specific pruner before calling getBatchInputFormat(), then restores the original value. This allows the lookup path to reuse all existing batch input format logic (COW, MOR, read-optimized) without duplication. hudi-common — PartitionPathParser.java Added parseLookupPartitionPaths(String spec, List<String> partitionKeys, boolean hiveStyle): parses the lookup.partitions spec into a list of Hudi partition paths. Validates that every key in the spec belongs to the table's declared partition key set; throws IllegalArgumentException with an informative message (including the list of valid keys) on unknown keys. Validates that every token follows key=value format; throws IllegalArgumentException on bare values without a key. Constructs paths in partitionKeys order regardless of the order keys appear in the spec, ensuring correct path generation for both Hive-style (key=value/key=value) and plain-value (value/value) layouts. ### Impact When lookup.partitions is not configured — the feature is fully opt-in and backward-compatible. When configured, only the specified partitions are read during cache population; rows whose lookup keys resolve to an excluded partition will not match any cache entry and will produce null values on the dimension side of a LEFT lookup join. Cache warm-up time and memory footprint are reduced proportionally to the fraction of partitions excluded. ### Risk Level none ### Documentation Update Documentation update is needed: to add new FlinkOption `lookup.partitions`. ### Contributor's checklist - [ ] Read through [contributor's guide](https://hudi.apache.org/contribute/how-to-contribute) - [ ] Enough context is provided in the sections above - [ ] Adequate tests were added if applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
