Hi Lincoln,
Thanks for your reply. Weijie and I discussed these two issues offline, and here are the results of our discussion: 1. When the user utilizes the hash lookup join hint introduced by FLIP-204[1], the `SupportsLookupCustomShuffle` interface should be ignored. This is because the hash lookup join hint is directly specified by the user through a SQL HINT, which is more in line with user intuition. WDYT? 2. We agree with the introduction of the `isDeterministic` method. The `SupportsLookupCustomShuffle` interface introduces a custom shuffle, which can cause ADD/UPDATE_AFTER events (+I, +U) to appear after UPDATE_BEFORE/DELETE events (-D, -U), thus breaking the current limitations of the Flink Sink Operator[2]. If `isDeterministic` returns false and the changelog event type is not insert-only, the Planner should not apply the shuffle provided by `SupportsLookupCustomShuffle`. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join [2] https://www.ververica.com/blog/flink-sql-secrets-mastering-the-art-of-changelog-event-out-of-orderness Best, Wencong At 2024-06-11 00:02:57, "Lincoln Lee" <lincoln.8...@gmail.com> wrote: >Hi Weijie, > >Thanks for your proposal, this will be a useful advanced optimization for >connector developers! > >I have two questions: > >1. FLIP-204[1] hash lookup join hint is mentioned in this FLIP, what's the >apply ordering of the two feature? For example, a connector that >implements the `SupportsLookupCustomShuffle` interface also has a >`SHUFFLE_HASH` lookup join hint specified by the user in sql, what's >the expected behavior? > >2. This FLIP considers the relationship with NDU processing, and I agree >with the current choice to prioritize NDU first. However, we should also >consider another issue: out-of-orderness of the changelog events in >streaming[2]. If the connector developer supplies a non-deterministic >partitioner, e.g., a random partitioner for anti-skew purpose, then it'll >break the assumption relied by current SQL operators in streaming: the >ADD/UDPATE_AFTER events (+I, +U) always occur before its related >UDPATE_BEFORE/DELETE events (-D, -U) and they are always >processed by the same task even if a data shuffle is involved. So a >straightforward approach would be to add method `isDeterministic` to >the `InputDataPartitioner` interface to explicitly tell the planner whether >the partitioner is deterministic or not(then the planner can reject the >non-deterministic custom partitioner for correctness requirements). > >[1] >https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join >[2] >https://www.ververica.com/blog/flink-sql-secrets-mastering-the-art-of-changelog-event-out-of-orderness > > >Best, >Lincoln Lee > > >Xintong Song <tonysong...@gmail.com> 于2024年6月7日周五 13:53写道: > >> +1 for this proposal. >> >> This FLIP will make it possible for each lookup join parallel task to only >> access and cache a subset of the data. This will significantly improve the >> performance and reduce the overhead when using Paimon for the dimension >> table. And it's general enough to also be leveraged by other connectors. >> >> Best, >> >> Xintong >> >> >> >> On Fri, Jun 7, 2024 at 10:01 AM weijie guo <guoweijieres...@gmail.com> >> wrote: >> >> > Hi devs, >> > >> > >> > I'd like to start a discussion about FLIP-462[1]: Support Custom Data >> > Distribution for Input Stream of Lookup Join. >> > >> > >> > Lookup Join is an important feature in Flink, It is typically used to >> > enrich a table with data that is queried from an external system. >> > If we interact with the external systems for each incoming record, we >> > incur significant network IO and RPC overhead. >> > >> > Therefore, most connectors introduce caching to reduce the per-record >> > level query overhead. However, because the data distribution of Lookup >> > Join's input stream is arbitrary, the cache hit rate is sometimes >> > unsatisfactory. >> > >> > >> > We want to introduce a mechanism for the connector to tell the Flink >> > planner its desired input stream data distribution or partitioning >> > strategy. This can significantly reduce the amount of cached data and >> > improve performance of Lookup Join. >> > >> > >> > You can find more details in this FLIP[1]. Looking forward to hearing >> > from you, thanks! >> > >> > >> > Best regards, >> > >> > Weijie >> > >> > >> > [1] >> > >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-462+Support+Custom+Data+Distribution+for+Input+Stream+of+Lookup+Join >> > >>