Hi Weijie, Thanks for your proposal, this will be a useful advanced optimization for connector developers!
I have two questions: 1. FLIP-204[1] hash lookup join hint is mentioned in this FLIP, what's the apply ordering of the two feature? For example, a connector that implements the `SupportsLookupCustomShuffle` interface also has a `SHUFFLE_HASH` lookup join hint specified by the user in sql, what's the expected behavior? 2. This FLIP considers the relationship with NDU processing, and I agree with the current choice to prioritize NDU first. However, we should also consider another issue: out-of-orderness of the changelog events in streaming[2]. If the connector developer supplies a non-deterministic partitioner, e.g., a random partitioner for anti-skew purpose, then it'll break the assumption relied by current SQL operators in streaming: the ADD/UDPATE_AFTER events (+I, +U) always occur before its related UDPATE_BEFORE/DELETE events (-D, -U) and they are always processed by the same task even if a data shuffle is involved. So a straightforward approach would be to add method `isDeterministic` to the `InputDataPartitioner` interface to explicitly tell the planner whether the partitioner is deterministic or not(then the planner can reject the non-deterministic custom partitioner for correctness requirements). [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join [2] https://www.ververica.com/blog/flink-sql-secrets-mastering-the-art-of-changelog-event-out-of-orderness Best, Lincoln Lee Xintong Song <tonysong...@gmail.com> 于2024年6月7日周五 13:53写道: > +1 for this proposal. > > This FLIP will make it possible for each lookup join parallel task to only > access and cache a subset of the data. This will significantly improve the > performance and reduce the overhead when using Paimon for the dimension > table. And it's general enough to also be leveraged by other connectors. > > Best, > > Xintong > > > > On Fri, Jun 7, 2024 at 10:01 AM weijie guo <guoweijieres...@gmail.com> > wrote: > > > Hi devs, > > > > > > I'd like to start a discussion about FLIP-462[1]: Support Custom Data > > Distribution for Input Stream of Lookup Join. > > > > > > Lookup Join is an important feature in Flink, It is typically used to > > enrich a table with data that is queried from an external system. > > If we interact with the external systems for each incoming record, we > > incur significant network IO and RPC overhead. > > > > Therefore, most connectors introduce caching to reduce the per-record > > level query overhead. However, because the data distribution of Lookup > > Join's input stream is arbitrary, the cache hit rate is sometimes > > unsatisfactory. > > > > > > We want to introduce a mechanism for the connector to tell the Flink > > planner its desired input stream data distribution or partitioning > > strategy. This can significantly reduce the amount of cached data and > > improve performance of Lookup Join. > > > > > > You can find more details in this FLIP[1]. Looking forward to hearing > > from you, thanks! > > > > > > Best regards, > > > > Weijie > > > > > > [1] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-462+Support+Custom+Data+Distribution+for+Input+Stream+of+Lookup+Join > > >