Hi, Weiqing.
After reading the new FLIP, I have no issues with the part `public interface`.
I only have some questions regarding
the details in the Proposed Changes section.
Regarding the ModifyKind and UpdateKind of the IntervalJoin node, IIUC:
- When early firing is enabled, the UpdateKind of the IntervalJoin can be
either ONLY_UPDATE_AFTER or
degrade to BEFORE_AND_AFTER, depending entirely on the requirements of the
sink. And the ModifyKind is always ALL.
- When early firing is disabled, the UpdateKind of the IntervalJoin is NONE,
and the ModifyKind is INSERT.
- Nevertheless, whether early firing is enabled or disabled, the IntervalJoin
should always require its input to keep
ModifyKind with INSERT_ONLY and UpdateKind with NONE.
--
Best!
Xuyang
At 2025-01-09 15:30:44, "Weiqing Yang" <[email protected]> wrote:
>Hi Xingcan and Xuyang,
>
>Thanks so much for the feedback - it was very helpful!
>
>*> 1. The current output stream of a time interval outer join is an
>append-only stream. This change will make it a potential retractable
>stream. I'm not sure if the planner supports a dynamic output type like
>that. Could you add this part to your design doc?*
>
>
> - Yes, enabling early firing on time interval outer joins can emit
> retractions when previously emitted rows are updated or invalidated by
> later matches. I’ve updated the proposal (Planner Awareness
>
> <https://docs.google.com/document/d/1YobpNdnvzSsceniVj4NZWi445gb1-54Rox-D7nPArZo/edit?tab=t.0#heading=h.y5w17oloacws>
> and Changes in FlinkChangelogModeInferenceProgram
>
> <https://docs.google.com/document/d/1YobpNdnvzSsceniVj4NZWi445gb1-54Rox-D7nPArZo/edit?tab=t.0#heading=h.z6qdwrvtgn4u>)
> to clarify that the stream might switch from append-only to a
> retract/upsert stream. Let me know if anything is missing.
>
>
>*> 2. What's the use case when the downstream components need to get the
>early fired results regularly?*
>
>
> - The new INTERVAL option (in addition to DELAY) allows periodic updates
> (e.g., every 10 minutes) after the initial delay. This captures how results
> evolve over time, similar to Apache Beam’s “Repeatedly” option.
>
>
>*> 3. The time interval join operator itself is not quite efficient when
>the state becomes large. Will there be any extra overhead after introducing
>this feature?*
>
> - Early fire does introduce some overhead by potentially emitting
> partial matches multiple times with retraction (avoiding duplicate outputs
> though). However, if it’s disabled, there is no additional cost. Most users
> find the performance trade-off acceptable for the real-time insights it
> provides.
>
>
>*> 1. Currently, there are some configs related to early firing available
>to users: `table.exec.emit.early-fire.en**abled` and
>`table.exec.emit.early-fire.de <http://table.exec.emit.early-fire.de>**lay`.
>Although their documentation states that they are only applicable to the
>Window operator, it seems possible that they may also be relevant in the
>context of this FLIP. Otherwise, having different early firing behaviors
>for different operators could confuse users.*
>
> - +1 on unifying early-fire behaviors to avoid confusion. I’ve added a
> section
>
> <https://docs.google.com/document/d/1YobpNdnvzSsceniVj4NZWi445gb1-54Rox-D7nPArZo/edit?tab=t.0#heading=h.rr0i3gmdjt4q>
>in
> the proposal highlighting that we should align hint-based interval join
> configurations with the existing table.exec.emit.* settings. Suggestions
> on how to make the unification are welcome! We plan to extend early firing
> to window joins via hints in a future FLIP.
>
>
>*> 2. The design of `time_mode` is excellent. Similar to point 1, perhaps
>we can introduce it to other window-related operators in the future.> 3.
>You need to modify the FlinkChangelogModeInferenceProgram to ensure that
>downstream nodes of interval joins with early firing enabled are aware of
>retract or upsert messages.*
>
> - We agree that time_mode could be introduced to other window-based
> operators down the road. We also want to support early fire for
> window join. Also, thanks for highlighting
> FlinkChangelogModeInferenceProgram! I added the code change on it
>
> <https://docs.google.com/document/d/1YobpNdnvzSsceniVj4NZWi445gb1-54Rox-D7nPArZo/edit?tab=t.0#heading=h.z6qdwrvtgn4u>
> in the proposal.
>
>
>Thanks again for your time and feedback! I’ve updated the proposal with
>these points. Please let me know if there’s anything else I should address.
>
>Best,
>Weiqing
>
>
>On Mon, Jan 6, 2025 at 6:32 PM Xuyang <[email protected]> wrote:
>
>> Hi, Weiqing. Thank you for drafting this FLIP. I have a few questions:
>>
>> 1. Currently, there are some configs related to early firing available to
>> users: `table.exec.emit.early-fire.enabled` and
>>
>> `table.exec.emit.early-fire.delay`. Although their documentation states
>> that they are only applicable to the Window operator,
>>
>> it seems possible that they may also be relevant in the context of this
>> FLIP. Otherwise, having different early firing behaviors
>>
>> for different operators could confuse users.
>>
>> 2. The design of `time_mode` is excellent. Similar to point 1, perhaps we
>> can introduce it to other window-related operators
>>
>> in the future.
>>
>> 3. You need to modify the FlinkChangelogModeInferenceProgram to ensure
>> that downstream nodes of interval joins with
>>
>> early firing enabled are aware of retract or upsert messages.
>>
>>
>>
>>
>> --
>>
>> Best!
>> Xuyang
>>
>>
>>
>>
>>
>> At 2025-01-07 06:35:51, "Xingcan Cui" <[email protected]> wrote:
>> >Hi Weiqing,
>> >
>> >Thanks for the proposal. IMO, adding early fire for time interval outer
>> >joins is feasible overall. I have a few questions.
>> >
>> >1. The current output stream of a time interval outer join is an
>> >append-only stream. This change will make it a potential retractable
>> >stream. I'm not sure if the planner supports a dynamic output type like
>> >that. Could you add this part to your design doc?
>> >2. What's the use case when the downstream components need to get the
>> early
>> >fired results regularly?
>> >3. The time interval join operator itself is not quite efficient when the
>> >state becomes large. Will there be any extra overhead after introducing
>> >this feature?
>> >
>> >Thanks,
>> >Xingcan
>> >
>> >On Mon, Jan 6, 2025 at 4:11 PM Weiqing Yang <[email protected]>
>> >wrote:
>> >
>> >> Hi all,
>> >>
>> >> Just a gentle reminder regarding the proposal I shared on early fire
>> >> support for Flink SQL interval joins. I’d greatly appreciate your
>> feedback
>> >> or suggestions.
>> >>
>> >> Here’s the link to the proposal document: Link
>> >> <
>> >>
>> https://docs.google.com/document/d/1YobpNdnvzSsceniVj4NZWi445gb1-54Rox-D7nPArZo/edit?tab=t.0#heading=h.z7bl0h2hwkph
>> >> >
>> >>
>> >> Thank you!
>> >>
>> >> Best,
>> >> Weiqing
>> >>
>> >> On Sun, Dec 22, 2024 at 11:19 PM Weiqing Yang <[email protected]
>> >
>> >> wrote:
>> >>
>> >> > Hi all,
>> >> >
>> >> > I’d like to initiate a discussion about introducing early fire support
>> >> for
>> >> > Flink SQL interval joins.
>> >> >
>> >> > *Motivation*
>> >> > In many streaming applications, particularly real-time analytics and
>> >> > monitoring systems, it is valuable to obtain partial results earlier
>> >> rather
>> >> > than waiting for full join conditions to finalize. For Flink SQL
>> interval
>> >> > joins, results are typically delayed until watermarks ensure no more
>> >> > matches can occur. This delay can be challenging for scenarios that
>> >> require
>> >> > fast feedback. Early fire support addresses this by emitting
>> intermediate
>> >> > results speculatively and using retractions or updates to maintain
>> >> eventual
>> >> > consistency and ensure correctness.
>> >> >
>> >> > Here’s the proposal document: Link
>> >> > <
>> >>
>> https://docs.google.com/document/d/1YobpNdnvzSsceniVj4NZWi445gb1-54Rox-D7nPArZo/edit?tab=t.0#heading=h.z7bl0h2hwkph
>> >> >
>> >> >
>> >> > Your feedback and ideas are welcome to refine this feature.
>> >> >
>> >> > Thanks,
>> >> > Weiqing
>> >> >
>> >>
>>