Thank you Timo for your response and the adjustments!

I have no further questions or comments.
So from my side I'm good to move forward with this FLIP.

Cheers, Fabian

On Fri, Nov 8, 2024 at 6:03 PM Timo Walther <> wrote:

> Hi David,
> happy to answer your questions as well:
>  > 1.  I wonder what class of scenarios that PTF’s do not do well in and
>  > that DataStream API would be preferable/required? Assuming we have PTF
>  > Map and List support.
> DataStream API supports broadcast state, async operators, custom
> serializers on state, more flexible state access, custom watermark
> generators, and processing time. So everything that is closer to the
> actual runtime (Flink runtime context) might be better suited in
> DataStream API. But of course, time will reveal what is really necessary
> in SQL. We shouldn't overload the SQL API.
>  >  2.  Does this mean “Registers a timer. After that the time interval,
>  > the watermark is progressed to that time by the framework and the
>  > timer is fired.“
> The JavaDoc might need a bit of improvement during the implementation
> phase. But in general there is nothing special about the semantics. They
> will be 1:1 like in DataStream API. A watermark W will trigger all
> timers up to where timestamp T <= W.
>  >  1.  When it says “A PTF can still make progress in time without an
>  > explicit on_time attribute”, how would it do this? Is business as
>  > usual with the watermark generator or is there also something else the
>  > PTF can do?
> Yes, business as usual. The implementation can also work purely on
> watermarks. The PTF can call `timerContext.registerOnNextWatermark()` if
> no time attribute is available via on_time arg.
> Similar as in DataStream API windows, the rowtime of a watermark fired
> operator is always T = W - 1. Because it needs to be smaller than the
> current watermark.
>  >  2., 3. ,4.
> I reworked/removed the affected sections.
> Cheers,
> Timo
> On 08.11.24 17:47, Timo Walther wrote:
> > Hi Fabian,
> >
> > thanks for this massive feedback. It took me quite some time to digest
> > it. I hope I covered most of your concerns. I gave the whole FLIP a
> > major update to reflect your suggestions.
> >
> >
> > 1. "Scoping and Simplifications", "Partition and Order Semantics"
> >
> >  > What does "by default" mean? Will there be a case where we won't
> >  > require partitioning?
> >
> > In Flink terms this means: If you set an argument as
> > `@ArgumentHint(TABLE_AS_SET)`, it will become a KeyedProcessFunction by
> > default. The reason for that is to avoid common mistakes made by
> > DataStream API users and enforce parallel execution. A ProcessFunction
> > (unkeyed) should be avoided, but is sometimes unavoidable. It can be
> > enabled by `@ArgumentHint(TABLE_AS_SET, OPTIONAL_PARTITION_BY)`. Use
> > cases could be e.g. a global COUNT(*) or global sort.
> >
> > 2. "Public Interfaces", "ProcessTableFunction"
> >
> >  > 2.1 Can the `ProcessTableFunction.getKind()` method be `final`?
> >
> > Absolutely. That was a typo. Fixed.
> >
> >  > 2.2 Should we also expose the timestamp for which the timer was set?
> >
> > The timestamp is exposed via `OnTimerContext.timeContext().time()`.
> >
> >  > 2.3 (minor) "As" in `Context.getTimeContextAs(Class<T>)` implies IMO
> >  > that the method returns an instance of T. Maybe rename to
> >  > `getTimeContext(Class<T>)` or `getTimeContextFor(Class<T>)`
> >
> > Makes sense. I renamed it to `timeContext()` to keep it short.
> >
> > 3. "Public Interfaces", "TimeContext":
> >
> >  >  3.1 `currentTime()`: this method name is a bit confusing IMO. I would
> >  > say, the "current time" is the WM time, because the WM drives
> >  > the global event-time clock of the query. I think it would be good,
> >  > to make it clear that the method returns the event-time column of the
> >  > current row.
> >
> > I reworked the JavaDoc of the interface and call it `time()` now. A
> > short and generic term might be the best for this method. `rowtime()`
> > does not fit entirely as it is also used within OnTimerContext.
> >
> >  >  3.2 `currentWatermark()`: should we rather emit `0` when no WM has
> >  > been received yet?
> >
> > The value is not a Long data type. 0 would just confuse in
> > LocalDataTime. It's better to make it explicit in this case and return
> > null.
> >
> >  >  3.3 `followingWatermark()`: TBH, I find this name quite confusing
> >  > and it took me a while until I understood its purpose. If I understand
> >  > correctly, it makes only sense to call it in the `onTimer()` method
> >  > because only there, we know the next watermark. Would it be possible
> >  > to rename it to `triggeringWatermark()` and ensure that it is only
> >  > callable in `onTimer()` and not in `eval()` and `finish()`?
> >
> > I adjusted the FLIP on this part to be more in sync with DataStream API.
> > A single `currentWatermark()` method should be enough with same
> > semantics as DataStream's ProcessFunction.
> >
> >  >  3.4 Are the methods to register periodic timers necessary? How would
> >  > they be used? How much do we help the user? What becomes easier / less
> >  > boilerplate code? How much do we confuse them with additional API
> > surface?
> >
> > I removed the periodic timers from the FLIP. A `registerOnNextWatermark`
> > should also do the job. We can still add periodic timers in the future.
> >
> > 4. "Public Interfaces", "TableSemantics":
> >
> >  >  4.1 rename to `TableArgSemantics`?
> >
> > Renamed to `TableSemantics` as we also need it for table args with row
> > semantics.
> >
> >  >  4.2 Why are the partitionBy and orderBy column indexes in
> >  > two-dimensional arrays? Wouldn't simple arrays be sufficient?
> >  > Using indexes also means that we don't allow partitionBy/orderBy
> >  > expressions, right? There's a simple work around using CTEs,views
> >  > or nested queries, but just want to mention it.
> >
> > Most interfaces use index paths (e.g. SupportsProjectionPushDown). But I
> > agree that we can simplify the interface. Nested PARTITION BY is
> > currently not supported.
> >
> >  >  4.3 Why are the copartition args not indexes but names?
> >  >  4.4 rename methods `xxxColumns()` -> `xxxColumnIdxs()`?
> >
> > For copartition args it is difficult to define what an index would mean.
> > Handling names makes it way easier in this case. `xxxColumnIdxs` reads
> > not very nice.
> >
> > 5. "Public Interfaces", "TypeInference":
> >
> >  >  5.1 Could `getStaticArguments()` return an empty list instead of
> >  > Optional<List<StaticArgument>>?
> >
> > Empty list would mean a non-arg function. But since there is still the
> > possibility to define an entire `inputTypeStrategy()` instead of a
> > static list of args the Optional is required.
> >
> > 6. "Public Interfaces", "FunctionDefinition":
> >
> >  >  6.1 `getInputChangelogMode()` do we need to be able to specify
> >  > different requirements if a PTF supports multiple inputs?
> >  >  6.2 `getOutputChangelogMode()` doesn't the mode depend on the input
> >  > changelog mode? Wouldn't we need to pass this information into
> >  > the method?
> >
> > Very good feedback. That was a bug in the FLIP. Of course we should be
> > able to define a supported changelog mode per input and derive the
> > output based on the actual input.
> >
> > The ArgumentTraits now contain a `SUPPORTS_RETRACT` trait for specifying
> > what an argument supports. An additional `ChangelogFunction` interface
> > keeps the `FunctionDefinition` clean. Generally, this is a power user
> > feature and a dedicated interface is the best location. PTFs should
> > rather be append-only if possible.
> >
> > 7. "Proposed Changes", "Implementation Details", "Time Semantics"
> >
> >  >  7.1 "Watermark by default": How is the described behavior different
> >  > from the standard behavior?
> >
> > It's not different. We follow DataStream API semantics. We don't
> > reinvent the wheel here. Also to avoid semantical bugs in the design.
> >
> > 8. "Proposed Changes", "Implementation Details", "Query Evolution"
> >
> >  >  8.1 I agree that the uid should be optional. However, why not use a
> >  > generated name (PTF-name + count suffix, UUID) if it isn't set? Then
> >  > we don't need to block multiple invocations per query which might not
> >  > be uncommon (UNION ALL multiple PTF branches). Sure, users could just
> >  > set the uid but do we want to fail a query for that?
> >
> > We set a generated name that contains the PTF name. Multiple invocations
> > should be clearly identified by the user. In this case I would rather
> > teach people the concept of uid() which is not hard to understand
> > instead of magically assign uids that are hard to be figured out after
> > submission. In DataStream API it is difficult to figure out the
> > autoassigned uid when it comes to pipeline evolution.
> >
> >  > Wouldn't the side-output example fail due to the same uids?
> >
> > No, it's about the effective uniqueness of uids. Not about how often it
> > occurs in a query. So for STATEMENT SET, we can enable specifying the
> > same uid multiple times if the planner can create exactly one instance
> > of it. Meaning exactly one StreamExecNode in the ExecNodGraph. If not,
> > an error will be thrown.
> >
> >  > Finally, I have a question regarding the compatibility with the new
> >  > async state access patterns. AFAICT, the PTF proposal should work
> >  > nicely with these new APIs. Is that assessment correct?
> >
> > I haven't spend much throught on it. But the interfaces should be
> > generic enough. If not we can add specialized classes that can be used
> > next to @StateHint and come up with methods for that.
> >
> > In general, this whole effort is an iterative feature. We won't be able
> > to consider all capabilities of DataStream API on day one. And this is
> > also not the goal. We don't need to reach 1:1 feature parity with
> > DataStream API but rather power 80% of the use cases and roadblocks that
> > SQL users face.
> >
> > Thanks,
> > Timo
> >
> >
> > On 07.11.24 16:23, David Radley wrote:
> >> Hi Timo,
> >> Thank you for the well structured and referenced Flip.  This looks
> >> really useful.
> >>
> >> Some thoughts:
> >>
> >>
> >>    1.  I wonder what class of scenarios that PTF’s do not do well in
> >> and that DataStream API would be preferable/required? Assuming we have
> >> PTF Map and List support.
> >>    2.  I see
> >> /**
> >>       * Registers a timer to be fired when the event time watermark
> >> passes the given time.
> >>       * Replaces an existing timer under the same name.
> >>       */
> >>      void registerOnTime(String name, T time);
> >>
> >> Does this mean:
> >>         “Registers a timer. After that the time interval, the
> >> watermark is progressed to that time by the framework and the timer is
> >> fired.“
> >>
> >>
> >>    1.  When it says “A PTF can still make progress in time without an
> >> explicit on_time attribute”, how would it do this? Is business as
> >> usual with the watermark generator or is there also something else the
> >> PTF can do?
> >>    2.  When it says “in case of onWatermark the output is rowtime =
> >> TimeContext.followingWatermark() - 1” . It would be good to state in
> >> the Flip  the thinking behind this.
> >>    3.  I agree with Fabian the currentWatermark and followingWatermark
> >> is confusing.
> >>    4.  On  void registerOnPeriodicTime(String name, T relativeTime,
> >> Duration period); what is relativeTime? relativeTime sounds like a
> >> duration.
> >>
> >> Kind regards, David.
> >>
> >> From: Fabian Hüske <>
> >> Date: Monday, 4 November 2024 at 15:41
> >> To: <>
> >> Subject: [EXTERNAL] Re: [DISCUSS] FLIP-440: User-defined SQL operators
> >> / ProcessTableFunction (PTF)
> >> Hi Timo,
> >>
> >> Thanks for the detailed and very well structured FLIP document!
> >> This is an important feature and will enable many more use-cases for
> >> Flink
> >> SQL and Table API.
> >>
> >> I have a few questions / suggestions:
> >>
> >> 1. "Scoping and Simplifications", "Partition and Order Semantics":
> >>    "By default, we require a partitioning for tables with set
> semantics."
> >> What does "by default" mean? Will there be a case where we won't require
> >> partitioning? What's the non-default case?
> >>
> >> 2. "Public Interfaces", "ProcessTableFunction":
> >>     2.1 Can the `ProcessTableFunction.getKind()` method be `final`?
> >>     2.2 "Public Interfaces", "ProcessTableFunction", `OnTimerContext`:
> >> Should we also expose the timestamp for which the timer was set?
> >>     2.3 (minor) "As" in `Context.getTimeContextAs(Class<T>)` implies IMO
> >> that the method returns an instance of T. Maybe rename to
> >> `getTimeContext(Class<T>)` or `getTimeContextFor(Class<T>)`
> >>
> >> 3. "Public Interfaces", "TimeContext":
> >>    3.1 `currentTime()`: this method name is a bit confusing IMO. I would
> >> say, the "current time" is the WM time, because the WM drives the global
> >> event-time clock of the query. I think it would be good, to make it
> clear
> >> that the method returns the event-time column of the current row.
> >>    3.2 `currentWatermark()`: should we rather emit `0` when not WM has
> >> been
> >> received yet?
> >>    3.3 `followingWatermark()`: TBH, I find this name quite confusing
> >> and it
> >> took me a while until I understood its purpose. If I understand
> >> correctly,
> >> it makes only sense to call it in the `onTimer()` method because only
> >> there, we know the next watermark. Would it be possible to rename it to
> >> `triggeringWatermark()` and ensure that it is only callable in
> >> `onTimer()`
> >> and not in `eval()` and `finish()`?
> >>    3.4 Are the methods to register periodic timers necessary? How
> >> would they
> >> be used? How much do we help the user? What becomes easier / less
> >> boilerplate code? How much do we confuse them with additional API
> >> surface?
> >>
> >> 4. "Public Interfaces", "TableSemantics":
> >>    4.1 rename to `TableArgSemantics`?
> >>    4.2 Why are the partitionBy and orderBy column indexes in
> >> two-dimensional
> >> arrays? Wouldn't simple arrays be sufficient? Using indexes also means
> >> that
> >> we don't allow partitionBy/orderBy expressions, right? There's a simple
> >> work around using CTEs,views or nested queries, but just want to
> >> mention it.
> >>    4.3 Why are the copartition args not indexes but names?
> >>    4.4 rename methods `xxxColumns()` -> `xxxColumnIdxs()`?
> >>
> >> 5. "Public Interfaces", "TypeInference":
> >>    5.1 Could `getStaticArguments()` return an empty list instead of
> >> Optional<List<StaticArgument>>?
> >>
> >> 6. "Public Interfaces", "FunctionDefinition":
> >>    6.1 `getInputChangelogMode()` do we need to be able to specify
> >> different
> >> requirements if a PTF supports multiple inputs?
> >>    6.2 `getOutputChangelogMode()` doesn't the mode depend on the input
> >> changelog mode? Wouldn't we need to pass this information into the
> >> method?
> >>
> >> 7. "Proposed Changes", "Implementation Details", "Time Semantics"
> >>    7.1 "Watermark by default": How is the described behavior different
> >> from
> >> the standard behavior? All timers fire on watermarks. The only
> difference
> >> is that we cannot set timers to row timestamps but that depends on the
> >> developer of the function and not on the framework. I don't disagree
> with
> >> that behavior, I'm just not sure if this needs to be mentioned at all.
> >>    7.2 "Output Timestamp": I'm not sure if the output timestamp behavior
> >> results in limitations on how PTFs can be used. This is a power-user API
> >> and it might make sense to allow developers to chose what timestamp to
> >> emit. They should of course make sure that the timestamp is not less
> than
> >> the current watermark. For example our implementation of OVER aggregates
> >> could not be implemented without setting explicit timestamps because we
> >> only collect rows in `eval()` and emit them in `onTimer()` but
> >> preserve the
> >> per-row timestamp.
> >>
> >> 8. "Proposed Changes", "Implementation Details", "Query Evolution"
> >>    8.1 I agree that the uid should be optional. However, why not use a
> >> generated name (PTF-name + count suffix, UUID) if it isn't set? Then we
> >> don't need to block multiple invocations per query which might not be
> >> uncommon (UNION ALL multiple PTF branches). Sure, users could just set
> >> the
> >> uid but do we want to fail a query for that?
> >>
> >> Finally, I found a few minor mistakes that you might want to fix:
> >>
> >> 1. "Public Interfaces", "ProcessTableFunction", "The collector is
> >> globally
> >> available for both eval() , finish(), and onTimer()". Usage of "both"
> >> although there are three items listed.
> >> 2. "Public Interfaces", "TypeInference": Should the first `table()`
> >> method
> >> use `EnumSet<ArgTrait> traits` instead of `EnumSet<StaticArgumentTraits>
> >> traits`?
> >> 3. "Proposed Changes", "Implementation Details", "Query Evolution":
> >> Wouldn't the side-output example fail due to the same uids?
> >>
> >> Finally, I have a question regarding the compatibility with the new
> async
> >> state access patterns. AFAICT, the PTF proposal should work nicely with
> >> these new APIs. Is that assessment correct?
> >>
> >> Thanks, Fabian
> >>
> >> On Mon, Nov 4, 2024 at 2:41 PM Timo Walther <> wrote:
> >>
> >>> Hi David, Hi Shengkai,
> >>>
> >>>   > can I apply a PTF to a stream that doesn't have a time attribute?
> >>>
> >>> Yes, time attributes are optional. This is why the
> >>> REQUIRES_TIME_ATTRIBUTE argument trait exists. If no on_time has been
> >>> specified in the SQL call and the REQUIRES_TIME_ATTRIBUTE trait is not
> >>> present, timers will fire on watermark by default. If there is no
> >>> watermark present, the timers will not fire - similar to DataStream
> API.
> >>> For built-in functions I suggest that we set the
> >>> to return early errors.
> >>>
> >>>   > is @StateHint CountState state referring to the class named Count?
> >>>   > This appears to be a typo.
> >>>
> >>> Yes it was a typo. Fixed. Thanks!
> >>>
> >>>   > 1. How do users register PTF to Flink? It looks like users can
> >>>   > use CREATE FUNCTION to register the PTF.
> >>>
> >>> PTFs behave like scalar or table functions in this regard. You can use
> >>> CREATE FUNCTION, TableEnvironment.createFunction, or inline in Table
> >>> using call(PTFClass.class). No special case.
> >>>
> >>>   > 2. Can the input parameter of PTF be a view? If the input parameter
> >>> is > a table, how can the developer of PTF know the schema of the input
> >>>   > table? Without type information, I don't know how to extract fields
> >>>   > from the Row or RowData.
> >>>
> >>> Yes, the input parameter can be a table, view or CTE. In theory also a
> >>> subquery should work but that might need more work in the Calcite
> >>> parser/validator.
> >>>
> >>> Regarding type information, this is good feedback. I added the type
> >>> information to the Context. Under
> >>> Context.tableSemanticsFor(arg).getDataType. Built-in functions won't
> >>> need this as they can also access the type via getTypeInference in
> >>> CallContext.
> >>>
> >>>   > 3. I see the API change mentioned that `getInputChangelogMode` and
> >>>   > `getOutputChangelogMode` will be added to FunctionDefinition? Can
> we
> >>>   > just add these two methods to ProcessTableFunction? After all,
> other
> >>>   > functions(e.g. scalar function) don't need these two methods.
> >>>
> >>> Unfortunately, this is not possible. As mentioned in the FLIP,
> >>> "Following the specification of FunctionDefinition: Instances of this
> >>> class provide all details necessary to validate a function call and
> >>> perform planning.". So everything for planning should be present in
> >>> FunctionDefinition. For ScalarFunction and other UDFs we can make these
> >>> methods `final`, so it won't be possible to change the append-only.
> >>>
> >>>   > 4. If an insert-only stream is converted to a changelog stream by
> >>> the
> >>>   > PTF, what is the upsert key of the changelog stream?
> >>>
> >>> Upsert keys won't be considered because only retract mode will be
> >>> supported.
> >>>
> >>>   > 5. I see that a new type, DESCRIPTOR, has been added. Can the user
> >>>   > declare a type as DESCRIPTOR directly in the DDL, or is the type
> >>> only
> >>>   > available to PTFs? If it is only used for PTFs, do we need to
> >>> provide
> >>>   > the DataTypes#DESCRIPTOR() method to allow the user to declare the
> >>>   > type? Or is it just a type for internal use like DistinctType.
> Also,
> >>>   > can you add the conversion relationship between this type and
> >>>   > other types?
> >>>
> >>> DESCRIPTOR is a logical type similar to SymbolType. In order to define
> >>> input signatures, I thought it makes sense to also expose
> >>> DataTypes#DESCRIPTOR() method. It is a bit more public and more useful
> >>> than SymbolType. Similar to INTERVAL it should not be used in DDL. It
> >>> cannot be casted to other types. I updated the section in the FLIP with
> >>> more information.
> >>>
> >>> Best,
> >>> Timo
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> On 03.11.24 07:12, Shengkai Fang wrote:
> >>>> Hi, Timo.
> >>>>
> >>>> Thanks for your proposal. This FLIP greatly extends the ease of use of
> >>> SQL!
> >>>> But I have some questions about this FLIP:
> >>>>
> >>>> 1. How do users register PTF to Flink? It looks like users can use
> >>>> CREATE
> >>>> FUNCTION to register the PTF.
> >>>>
> >>>> 2. Can the input parameter of PTF be a view? If the input parameter
> >>>> is a
> >>>> table, how can the developer of PTF know the schema of the input
> table?
> >>>> Without type information, I don't know how to extract fields from
> >>>> the Row
> >>>> or RowData.
> >>>>
> >>>> 3. I see the API change mentioned that `getInputChangelogMode` and
> >>>> `getOutputChangelogMode` will be added to FunctionDefinition? Can we
> >>>> just
> >>>> add these two methods to ProcessTableFunction? After all, other
> >>>> functions(e.g. scalar function) don't need these two methods.
> >>>>
> >>>> 4. If an insert-only stream is converted to a changelog stream by the
> >>> PTF,
> >>>> what is the upsert key of the changelog stream?
> >>>>
> >>>> 5. I see that a new type, DESCRIPTOR, has been added. Can the user
> >>> declare
> >>>> a type as DESCRIPTOR directly in the DDL, or is the type only
> available
> >>> to
> >>>> PTFs? If it is only used for PTFs, do we need to provide the
> >>>> DataTypes#DESCRIPTOR() method to allow the user to declare the type?
> Or
> >>> is
> >>>> it just a type for internal use like DistinctType. Also, can you add
> >>>> the
> >>>> conversion relationship between this type and other types?
> >>>>
> >>>> Best,
> >>>> Shengkai
> >>>>
> >>>> David Anderson <> 于2024年11月2日周六 00:03写道:
> >>>>
> >>>>> Timo, thanks for the response. I have a few more questions.
> >>>>>
> >>>>> as mentioned in "Scoping and Simplifications" a PTF will not support
> >>>>>> late events. It will filter them out. We have to solve the late
> >>>>>> events
> >>>>>> topic at an earlier stage in the SQL pipeline. This is a different
> >>>>>> FLIP
> >>>>>> discussion. Not every SQL operator should deal with late events in a
> >>>>>> different way.
> >>>>>
> >>>>>
> >>>>> So long as we can someday cleanly handle late events, I'm okay with
> >>> this.
> >>>>>
> >>>>> Some followup questions: can I apply a PTF to a stream that doesn't
> >>> have a
> >>>>> time attribute? The section on time and watermarks seems to allow for
> >>> this,
> >>>>> but it also seems to expect that watermarks are present, regardless.
> >>> What
> >>>>> if they aren't? I'm wondering if there's a case where there are no
> >>>>> watermarks, and a PTF registers a timer that can never fire.
> >>>>>
> >>>>> And one unrelated question:
> >>>>>
> >>>>> In the CountWithTimeout example, is @StateHint CountState state
> >>> referring
> >>>>> to the class named Count? This appears to be a typo.
> >>>>>
> >>>>> David
> >>>>>
> >>>>> On Fri, Nov 1, 2024 at 3:03 PM Timo Walther <>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi David,
> >>>>>>
> >>>>>> as mentioned in "Scoping and Simplifications" a PTF will not support
> >>>>>> late events. It will filter them out. We have to solve the late
> >>>>>> events
> >>>>>> topic at an earlier stage in the SQL pipeline. This is a different
> >>>>>> FLIP
> >>>>>> discussion. Not every SQL operators should deal with late events in
> a
> >>>>>> different way.
> >>>>>>
> >>>>>>    > Is there a guarantee that watermarking will be applied
> >>>>>> upstream of
> >>>>>> the split between the two statements in the resulting job graph?
> >>>>>>
> >>>>>> Yes, the uid will make the PTF unique in the entire job graph. If
> >>>>>> this
> >>>>>> is not possible (e.g. because the two PTF invocations are used in
> >>>>>> completely different statements and a common subgraph cannot be
> >>>>>> determined), we can throw an errror.
> >>>>>>
> >>>>>> Regards,
> >>>>>> Timo
> >>>>>>
> >>>>>>
> >>>>>> On 01.11.24 12:13, David Anderson wrote:
> >>>>>>>> 3. Change of interfaces for multiple output tables
> >>>>>>>> Currently, I think using a STATEMENT SET should be enough for side
> >>>>>>>> output semantics. I have added an example in section for
> >>> that.
> >>>>>>>
> >>>>>>> I question whether this really works. Is there a guarantee that
> >>>>>>> watermarking will be applied upstream of the split between the two
> >>>>>>> statements in the resulting job graph? Otherwise, important use
> >>>>>>> cases
> >>>>>> like
> >>>>>>> sending late events to a side output will behave
> >>> non-deterministically,
> >>>>>> and
> >>>>>>> be useless.
> >>>>>>>
> >>>>>>> David
> >>>>>>>
> >>>>>>>
> >>>>>>> On Fri, Nov 1, 2024 at 10:26 AM Timo Walther <>
> >>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi Xuyang,
> >>>>>>>>
> >>>>>>>> thanks for the good questions.
> >>>>>>>>
> >>>>>>>> 1. What happens if the TTLs for these different StateHints are not
> >>> the
> >>>>>>>> same?
> >>>>>>>>
> >>>>>>>> The eval() fully determines available state and their TTL. Helper
> >>>>>>>> methods such as onTimer() and finish() can references a subset of
> >>>>>>>> declared state. It is not necessary that the helper methods
> declare
> >>>>> all
> >>>>>>>> state properties one more time. The name should be sufficient
> >>>>>>>> and we
> >>>>>>>> should forbid setting additional properties.
> >>>>>>>>
> >>>>>>>> 2. I believe the named arguments introduced in FLIP-387[1] can
> also
> >>> be
> >>>>>>>> applied to this ProcessTableFunction, right?
> >>>>>>>>
> >>>>>>>> Absolutely, the PTF actually needs named arguments. Esp for
> >>>>>>>> optional
> >>>>>>>> fields such uid or on_time. For forward compatibility, I would
> even
> >>>>>>>> suggest that PTFs only support named arguments. Not sure if we can
> >>>>>>>> enforce that.
> >>>>>>>>
> >>>>>>>> 3. Will we expose the original RowKind in the eval method's row
> >>> input?
> >>>>>>>>
> >>>>>>>> Yes, it's likely that only advanced users will take use of that.
> In
> >>>>> that
> >>>>>>>> case users have to work with Row/RowData. It's likely that rather
> >>>>>>>> build-in functions will make use of this. The default changelog
> >>>>>>>> mode
> >>>>> for
> >>>>>>>> both input and output is append.
> >>>>>>>>
> >>>>>>>> 4. Are we allowing users to define both styles simultaneously
> >>>>>>>>
> >>>>>>>> Yes. Context is optional. And state access in helper methods
> >>>>>>>> (finish/onTimer) as well. This reduces the overhead in case a PTF
> >>> runs
> >>>>>>>> in a container/other process.
> >>>>>>>>
> >>>>>>>> I will update the FLIP to reflect these answers.
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>> Timo
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 01.11.24 05:10, Xuyang wrote:
> >>>>>>>>> Hi, Timo.
> >>>>>>>>>
> >>>>>>>>> Thank you for this great work! When I previously introduced the
> >>>>> session
> >>>>>>>> window TVF, I was contemplating
> >>>>>>>>>
> >>>>>>>>> how to enable users to define a PTF in SQL. I'm glad to see this
> >>> work
> >>>>>>>> being discussed and that it has
> >>>>>>>>>
> >>>>>>>>> improved the integration with the DataStream API.
> >>>>>>>>>
> >>>>>>>>> After reading the entire flip, I have a few questions that I hope
> >>> you
> >>>>>>>> can address.
> >>>>>>>>>
> >>>>>>>>> 1. I noticed that in the example, the same field (e.g.,
> >>>>>>>>> CountState)
> >>>>> can
> >>>>>>>> declare a StateHint in the eval, onTimer,
> >>>>>>>>>
> >>>>>>>>> and finish methods. What happens if the TTLs for these different
> >>>>>>>> StateHints are not the same?
> >>>>>>>>>
> >>>>>>>>> 2. I believe the named arguments introduced in FLIP-387[1] can
> >>>>>>>>> also
> >>>>> be
> >>>>>>>> applied to this ProcessTableFunction, right?
> >>>>>>>>>
> >>>>>>>>> 3. In our UDAFs, we expect users to provide accumulate and
> retract
> >>>>>>>> methods to handle input data for +I/+U and -U/-D.
> >>>>>>>>>
> >>>>>>>>> However, in the eval method of a ScalarFunction/UDTF, users do
> not
> >>>>> have
> >>>>>>>> visibility into the input's RowKind. In the new PTF,
> >>>>>>>>>
> >>>>>>>>> will we expose the original RowKind in the eval method's row
> >>>>>>>>> input,
> >>>>>>>> allowing users to determine the row's RowKind themselves?
> >>>>>>>>>
> >>>>>>>>> 4. I noticed that in the examples, the eval method sometimes
> >>> includes
> >>>>>>>> the Context, @StateHint fields, and the input data (Row
> >>>>>>>>>
> >>>>>>>>> input), while other times it only consists of the input data.
> >>>>>>>>> Are we
> >>>>>>>> allowing users to define both styles simultaneously?
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> [1]
> >>>>>>>>
> >>>>>>
> >>>>>
> >>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>>
> >>>>>>>>>         Best!
> >>>>>>>>>         Xuyang
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> At 2024-10-31 21:57:37, "Timo Walther" <>
> wrote:
> >>>>>>>>>> Hi everyone,
> >>>>>>>>>>
> >>>>>>>>>> thanks for all the feedback I received so far. I had very
> healthy
> >>>>>>>>>> discussions with various people both online and offline at
> >>>>>>>>>> Current
> >>>>> and
> >>>>>>>>>> Flink Forward Berlin. The general user responses were also very
> >>>>>>>>>> positive. The FLIP should be ready to start a VOTE thread.
> >>>>>>>>>>
> >>>>>>>>>> This is the last call for feedback. I would start a VOTE
> tomorrow
> >>> if
> >>>>>>>>>> there are no objections. Happy to take further feedback during
> >>>>>>>>>> implementation as well.
> >>>>>>>>>>
> >>>>>>>>>> Thanks,
> >>>>>>>>>> Timo
> >>>>>>>>>>
> >>>>>>>>>> On 30.10.24 14:34, Timo Walther wrote:
> >>>>>>>>>>> Hi Jim,
> >>>>>>>>>>>
> >>>>>>>>>>> 3. Multiple output tables
> >>>>>>>>>>>
> >>>>>>>>>>>      > Does the target_table need to be specified in the SELECT
> >>>>> clause?
> >>>>>>>>>>>
> >>>>>>>>>>> No. Similar to reading from a regular table. The filter column
> >>> must
> >>>>>> not
> >>>>>>>>>>> be part of SELECT part.
> >>>>>>>>>>>
> >>>>>>>>>>>      > It seems like the two target_table could have separate
> >>> schemas
> >>>>>>>> defined.
> >>>>>>>>>>>
> >>>>>>>>>>> That is true. The SELECT is responsible to transforms the
> >>>>>>>>>>> columns
> >>>>>> into
> >>>>>>>>>>> the target table's schema. The output row of the PTF might be a
> >>>>> union
> >>>>>>>> of
> >>>>>>>>>>> various columns in this case.
> >>>>>>>>>>>
> >>>>>>>>>>> 10. Support for State TTL
> >>>>>>>>>>>
> >>>>>>>>>>>      > I'd be strongly in favor of doing any interface / base
> >>>>>>>>>>> work
> >>> we
> >>>>>>>> need in
> >>>>>>>>>>>      > the initial implementation so that state size can be
> >>> managed.
> >>>>>>>>>>>
> >>>>>>>>>>> I agree, State TTL is crucial. I updated the FLIP and added
> >>>>>> interfaces
> >>>>>>>>>>> to StateTypeStrategy and @StateHint.
> >>>>>>>>>>>
> >>>>>>>>>>> Cheers,
> >>>>>>>>>>> Timo
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On 23.10.24 17:59, Jim Hughes wrote:
> >>>>>>>>>>>> Hi Timo,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thank you for the answers.  I have a few clarifications
> >>>>>>>>>>>> inlined.
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Mon, Oct 14, 2024 at 8:07 AM Timo Walther
> >>>>>>>>>>>> <
> >>>>
> >>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> 3. Change of interfaces for multiple output tables
> >>>>>>>>>>>>> Currently, I think using a STATEMENT SET should be enough for
> >>>>> side
> >>>>>>>>>>>>> output semantics. I have added an example in section
> >>>>>>>>>>>>> for
> >>>>>>>> that.
> >>>>>>>>>>>>> We are still free to add more methods to Context, let the
> >>>>> function
> >>>>>>>>>>>>> implement additional interfaces or use more code generation
> >>>>>> together
> >>>>>>>>>>>>> with @ArgumentHints.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> Does the target_table need to be specified in the SELECT
> >>>>>>>>>>>> clause?
> >>>>> Or
> >>>>>>>>>>>> could
> >>>>>>>>>>>> it read
> >>>>>>>>>>>>
> >>>>>>>>>>>>         INSERT INTO main SELECT a, b FROM
> >>>>>> FunctionWithSideOutput(input =>
> >>>>>>>>>>>> data,
> >>>>>>>>>>>> uid = 'only_once') WHERE target_table = 'main';
> >>>>>>>>>>>>         INSERT INTO side SELECT a, b FROM
> >>>>>> FunctionWithSideOutput(input =>
> >>>>>>>>>>>> data,
> >>>>>>>>>>>> uid = 'only_once') WHERE target_table = 'side';
> >>>>>>>>>>>> END;
> >>>>>>>>>>>>
> >>>>>>>>>>>> Separately, for clarity, it seems like the two target_table
> >>>>>>>>>>>> could
> >>>>>> have
> >>>>>>>>>>>> separate schemas defined.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>> 10. Support for State TTL
> >>>>>>>>>>>>> Supporting state TTL will be easy. We just need to add a
> >>>>> parameter
> >>>>>> to
> >>>>>>>>>>>>> @StateHint and pass it through.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> If PTFs can have state, I'd be strongly in favor of doing any
> >>>>>>>> interface /
> >>>>>>>>>>>> base work we need in the initial implementation so that state
> >>> size
> >>>>>>>> can be
> >>>>>>>>>>>> managed.  If it is just sufficient to have hints in the
> >>> interface,
> >>>>>>>>>>>> awesome!
> >>>>>>>>>>>>
> >>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Jim
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>>
> >>
> >> Unless otherwise stated above:
> >>
> >> IBM United Kingdom Limited
> >> Registered in England and Wales with number 741598
> >> Registered office: Building C, IBM Hursley Office, Hursley Park Road,
> >> Winchester, Hampshire SO21 2JN
> >

Reply via email to