Thanks Weijie & Wencong for your update including the conclusions of
the offline discussion.

There's one thing need to be confirmed in the FLIP:
> The hint only provides a suggestion to the optimizer, it is not an
enforcer. As a result, If the target dim table not implements
SupportsLookupCustomShuffle, planner will ignore this newly introduced
shuffle option.

Since we've decided to extend a new hint option 'shuffle' to the current
`LOOKUP` join hint, do we support hash shuffle as well?(It seems like it
shouldn't require a lot of extra work, right?)
This will deliver a complete new feature to users,  also because
FLIP-204 is stale for now and this new extension will give user a more
simpler way to achieve the goal, WDYT?

Another small comment for the new interface:
> "... planner may not apply this partitioner in upsert mode ..."
> default boolean isDeterministic()
"upsert mode" should be "updating stream" or "non-insert-only stream".


Best,
Lincoln Lee


Wencong Liu <liuwencle...@163.com> 于2024年6月12日周三 21:43写道:

> Hi Jingsong,
>
>
> Some of the points you mentioned are currently clarified in
> the updated FLIP. Please check it out.
>
>
> 1. Enabling custom data distribution can be done through the
> LOOKUP SQL Hint. There are detailed examples provided in the FLIP.
>
>
> 2. We will add the isDeterministic method to the `InputDataPartitioner`
> interface, which will return true by default. If the
> `InputDataPartitioner`
> is not deterministic, the connector developer need to override the
> isDeterministic method to return false. If the connector developer
> cannot ensure this protocol, they will need to bear the correctness
> issues that arise.
>
>
> 3. Yes, this feature will work in batch mode as well.
>
>
> Best regards,
> Wencong
>
>
>
>
>
> At 2024-06-11 23:47:40, "Jingsong Li" <jingsongl...@gmail.com> wrote:
> >Hi all,
> >
> >+1 to this FLIP, very thanks all for your proposal.
> >
> >isDeterministic looks good to me too.
> >
> >We can consider stating the following points:
> >
> >1. How to enable custom data distribution? Is it a dynamic hint? Can
> >you provide an SQL example.
> >
> >2. What impact will it have when the mainstream is changelog? Causing
> >disorder? This may need to be emphasized.
> >
> >3. Does this feature work in batch mode too?
> >
> >Best,
> >Jingsong
> >
> >On Tue, Jun 11, 2024 at 8:22 PM Wencong Liu <liuwencle...@163.com> wrote:
> >>
> >> Hi Lincoln,
> >>
> >>
> >> Thanks for your reply. Weijie and I discussed these two issues offline,
> >> and here are the results of our discussion:
> >> 1. When the user utilizes the hash lookup join hint introduced by
> FLIP-204[1],
> >> the `SupportsLookupCustomShuffle` interface should be ignored. This is
> because
> >> the hash lookup join hint is directly specified by the user through a
> SQL HINT,
> >> which is more in line with user intuition. WDYT?
> >> 2. We agree with the introduction of the `isDeterministic` method. The
> >> `SupportsLookupCustomShuffle` interface introduces a custom shuffle,
> which
> >> can cause ADD/UPDATE_AFTER events (+I, +U) to appear
> >> after UPDATE_BEFORE/DELETE events (-D, -U), thus breaking the current
> >> limitations of the Flink Sink Operator[2]. If `isDeterministic` returns
> false and the
> >> changelog event type is not insert-only, the Planner should not apply
> the shuffle
> >> provided by `SupportsLookupCustomShuffle`.
> >>
> >>
> >> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join
> >> [2]
> https://www.ververica.com/blog/flink-sql-secrets-mastering-the-art-of-changelog-event-out-of-orderness
> >>
> >>
> >> Best,
> >> Wencong
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> At 2024-06-11 00:02:57, "Lincoln Lee" <lincoln.8...@gmail.com> wrote:
> >> >Hi Weijie,
> >> >
> >> >Thanks for your proposal, this will be a useful advanced optimization
> for
> >> >connector developers!
> >> >
> >> >I have two questions:
> >> >
> >> >1. FLIP-204[1] hash lookup join hint is mentioned in this FLIP, what's
> the
> >> >apply ordering of the two feature? For example, a connector that
> >> >implements the `SupportsLookupCustomShuffle` interface also has a
> >> >`SHUFFLE_HASH` lookup join hint specified by the user in sql, what's
> >> >the expected behavior?
> >> >
> >> >2. This FLIP considers the relationship with NDU processing, and I
> agree
> >> >with the current choice to prioritize NDU first. However, we should
> also
> >> >consider another issue: out-of-orderness of the changelog events in
> >> >streaming[2]. If the connector developer supplies a non-deterministic
> >> >partitioner, e.g., a random partitioner for anti-skew purpose, then
> it'll
> >> >break the assumption relied by current SQL operators in streaming: the
> >> >ADD/UDPATE_AFTER events (+I, +U) always occur before its related
> >> >UDPATE_BEFORE/DELETE events (-D, -U) and they are always
> >> >processed by the same task even if a data shuffle is involved. So a
> >> >straightforward approach would be to add method `isDeterministic` to
> >> >the `InputDataPartitioner` interface to explicitly tell the planner
> whether
> >> >the partitioner is deterministic or not(then the planner can reject the
> >> >non-deterministic custom partitioner for correctness requirements).
> >> >
> >> >[1]
> >> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join
> >> >[2]
> >> >
> https://www.ververica.com/blog/flink-sql-secrets-mastering-the-art-of-changelog-event-out-of-orderness
> >> >
> >> >
> >> >Best,
> >> >Lincoln Lee
> >> >
> >> >
> >> >Xintong Song <tonysong...@gmail.com> 于2024年6月7日周五 13:53写道:
> >> >
> >> >> +1 for this proposal.
> >> >>
> >> >> This FLIP will make it possible for each lookup join parallel task
> to only
> >> >> access and cache a subset of the data. This will significantly
> improve the
> >> >> performance and reduce the overhead when using Paimon for the
> dimension
> >> >> table. And it's general enough to also be leveraged by other
> connectors.
> >> >>
> >> >> Best,
> >> >>
> >> >> Xintong
> >> >>
> >> >>
> >> >>
> >> >> On Fri, Jun 7, 2024 at 10:01 AM weijie guo <
> guoweijieres...@gmail.com>
> >> >> wrote:
> >> >>
> >> >> > Hi devs,
> >> >> >
> >> >> >
> >> >> > I'd like to start a discussion about FLIP-462[1]: Support Custom
> Data
> >> >> > Distribution for Input Stream of Lookup Join.
> >> >> >
> >> >> >
> >> >> > Lookup Join is an important feature in Flink, It is typically used
> to
> >> >> > enrich a table with data that is queried from an external system.
> >> >> > If we interact with the external systems for each incoming record,
> we
> >> >> > incur significant network IO and RPC overhead.
> >> >> >
> >> >> > Therefore, most connectors introduce caching to reduce the
> per-record
> >> >> > level query overhead. However, because the data distribution of
> Lookup
> >> >> > Join's input stream is arbitrary, the cache hit rate is sometimes
> >> >> > unsatisfactory.
> >> >> >
> >> >> >
> >> >> > We want to introduce a mechanism for the connector to tell the
> Flink
> >> >> > planner its desired input stream data distribution or partitioning
> >> >> > strategy. This can significantly reduce the amount of cached data
> and
> >> >> > improve performance of Lookup Join.
> >> >> >
> >> >> >
> >> >> > You can find more details in this FLIP[1]. Looking forward to
> hearing
> >> >> > from you, thanks!
> >> >> >
> >> >> >
> >> >> > Best regards,
> >> >> >
> >> >> > Weijie
> >> >> >
> >> >> >
> >> >> > [1]
> >> >> >
> >> >> >
> >> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-462+Support+Custom+Data+Distribution+for+Input+Stream+of+Lookup+Join
> >> >> >
> >> >>
>

Reply via email to