Thank you Timo for your response and the adjustments! I have no further questions or comments. So from my side I'm good to move forward with this FLIP.
Cheers, Fabian On Fri, Nov 8, 2024 at 6:03 PM Timo Walther <twal...@apache.org> wrote: > Hi David, > > happy to answer your questions as well: > > > 1. I wonder what class of scenarios that PTF’s do not do well in and > > that DataStream API would be preferable/required? Assuming we have PTF > > Map and List support. > > DataStream API supports broadcast state, async operators, custom > serializers on state, more flexible state access, custom watermark > generators, and processing time. So everything that is closer to the > actual runtime (Flink runtime context) might be better suited in > DataStream API. But of course, time will reveal what is really necessary > in SQL. We shouldn't overload the SQL API. > > > 2. Does this mean “Registers a timer. After that the time interval, > > the watermark is progressed to that time by the framework and the > > timer is fired.“ > > The JavaDoc might need a bit of improvement during the implementation > phase. But in general there is nothing special about the semantics. They > will be 1:1 like in DataStream API. A watermark W will trigger all > timers up to where timestamp T <= W. > > > 1. When it says “A PTF can still make progress in time without an > > explicit on_time attribute”, how would it do this? Is business as > > usual with the watermark generator or is there also something else the > > PTF can do? > > Yes, business as usual. The implementation can also work purely on > watermarks. The PTF can call `timerContext.registerOnNextWatermark()` if > no time attribute is available via on_time arg. > > Similar as in DataStream API windows, the rowtime of a watermark fired > operator is always T = W - 1. Because it needs to be smaller than the > current watermark. > > > 2., 3. ,4. > > I reworked/removed the affected sections. > > Cheers, > Timo > > On 08.11.24 17:47, Timo Walther wrote: > > Hi Fabian, > > > > thanks for this massive feedback. It took me quite some time to digest > > it. I hope I covered most of your concerns. I gave the whole FLIP a > > major update to reflect your suggestions. > > > > > > 1. "Scoping and Simplifications", "Partition and Order Semantics" > > > > > What does "by default" mean? Will there be a case where we won't > > > require partitioning? > > > > In Flink terms this means: If you set an argument as > > `@ArgumentHint(TABLE_AS_SET)`, it will become a KeyedProcessFunction by > > default. The reason for that is to avoid common mistakes made by > > DataStream API users and enforce parallel execution. A ProcessFunction > > (unkeyed) should be avoided, but is sometimes unavoidable. It can be > > enabled by `@ArgumentHint(TABLE_AS_SET, OPTIONAL_PARTITION_BY)`. Use > > cases could be e.g. a global COUNT(*) or global sort. > > > > 2. "Public Interfaces", "ProcessTableFunction" > > > > > 2.1 Can the `ProcessTableFunction.getKind()` method be `final`? > > > > Absolutely. That was a typo. Fixed. > > > > > 2.2 Should we also expose the timestamp for which the timer was set? > > > > The timestamp is exposed via `OnTimerContext.timeContext().time()`. > > > > > 2.3 (minor) "As" in `Context.getTimeContextAs(Class<T>)` implies IMO > > > that the method returns an instance of T. Maybe rename to > > > `getTimeContext(Class<T>)` or `getTimeContextFor(Class<T>)` > > > > Makes sense. I renamed it to `timeContext()` to keep it short. > > > > 3. "Public Interfaces", "TimeContext": > > > > > 3.1 `currentTime()`: this method name is a bit confusing IMO. I would > > > say, the "current time" is the WM time, because the WM drives > > > the global event-time clock of the query. I think it would be good, > > > to make it clear that the method returns the event-time column of the > > > current row. > > > > I reworked the JavaDoc of the interface and call it `time()` now. A > > short and generic term might be the best for this method. `rowtime()` > > does not fit entirely as it is also used within OnTimerContext. > > > > > 3.2 `currentWatermark()`: should we rather emit `0` when no WM has > > > been received yet? > > > > The value is not a Long data type. 0 would just confuse in > > LocalDataTime. It's better to make it explicit in this case and return > > null. > > > > > 3.3 `followingWatermark()`: TBH, I find this name quite confusing > > > and it took me a while until I understood its purpose. If I understand > > > correctly, it makes only sense to call it in the `onTimer()` method > > > because only there, we know the next watermark. Would it be possible > > > to rename it to `triggeringWatermark()` and ensure that it is only > > > callable in `onTimer()` and not in `eval()` and `finish()`? > > > > I adjusted the FLIP on this part to be more in sync with DataStream API. > > A single `currentWatermark()` method should be enough with same > > semantics as DataStream's ProcessFunction. > > > > > 3.4 Are the methods to register periodic timers necessary? How would > > > they be used? How much do we help the user? What becomes easier / less > > > boilerplate code? How much do we confuse them with additional API > > surface? > > > > I removed the periodic timers from the FLIP. A `registerOnNextWatermark` > > should also do the job. We can still add periodic timers in the future. > > > > 4. "Public Interfaces", "TableSemantics": > > > > > 4.1 rename to `TableArgSemantics`? > > > > Renamed to `TableSemantics` as we also need it for table args with row > > semantics. > > > > > 4.2 Why are the partitionBy and orderBy column indexes in > > > two-dimensional arrays? Wouldn't simple arrays be sufficient? > > > Using indexes also means that we don't allow partitionBy/orderBy > > > expressions, right? There's a simple work around using CTEs,views > > > or nested queries, but just want to mention it. > > > > Most interfaces use index paths (e.g. SupportsProjectionPushDown). But I > > agree that we can simplify the interface. Nested PARTITION BY is > > currently not supported. > > > > > 4.3 Why are the copartition args not indexes but names? > > > 4.4 rename methods `xxxColumns()` -> `xxxColumnIdxs()`? > > > > For copartition args it is difficult to define what an index would mean. > > Handling names makes it way easier in this case. `xxxColumnIdxs` reads > > not very nice. > > > > 5. "Public Interfaces", "TypeInference": > > > > > 5.1 Could `getStaticArguments()` return an empty list instead of > > > Optional<List<StaticArgument>>? > > > > Empty list would mean a non-arg function. But since there is still the > > possibility to define an entire `inputTypeStrategy()` instead of a > > static list of args the Optional is required. > > > > 6. "Public Interfaces", "FunctionDefinition": > > > > > 6.1 `getInputChangelogMode()` do we need to be able to specify > > > different requirements if a PTF supports multiple inputs? > > > 6.2 `getOutputChangelogMode()` doesn't the mode depend on the input > > > changelog mode? Wouldn't we need to pass this information into > > > the method? > > > > Very good feedback. That was a bug in the FLIP. Of course we should be > > able to define a supported changelog mode per input and derive the > > output based on the actual input. > > > > The ArgumentTraits now contain a `SUPPORTS_RETRACT` trait for specifying > > what an argument supports. An additional `ChangelogFunction` interface > > keeps the `FunctionDefinition` clean. Generally, this is a power user > > feature and a dedicated interface is the best location. PTFs should > > rather be append-only if possible. > > > > 7. "Proposed Changes", "Implementation Details", "Time Semantics" > > > > > 7.1 "Watermark by default": How is the described behavior different > > > from the standard behavior? > > > > It's not different. We follow DataStream API semantics. We don't > > reinvent the wheel here. Also to avoid semantical bugs in the design. > > > > 8. "Proposed Changes", "Implementation Details", "Query Evolution" > > > > > 8.1 I agree that the uid should be optional. However, why not use a > > > generated name (PTF-name + count suffix, UUID) if it isn't set? Then > > > we don't need to block multiple invocations per query which might not > > > be uncommon (UNION ALL multiple PTF branches). Sure, users could just > > > set the uid but do we want to fail a query for that? > > > > We set a generated name that contains the PTF name. Multiple invocations > > should be clearly identified by the user. In this case I would rather > > teach people the concept of uid() which is not hard to understand > > instead of magically assign uids that are hard to be figured out after > > submission. In DataStream API it is difficult to figure out the > > autoassigned uid when it comes to pipeline evolution. > > > > > Wouldn't the side-output example fail due to the same uids? > > > > No, it's about the effective uniqueness of uids. Not about how often it > > occurs in a query. So for STATEMENT SET, we can enable specifying the > > same uid multiple times if the planner can create exactly one instance > > of it. Meaning exactly one StreamExecNode in the ExecNodGraph. If not, > > an error will be thrown. > > > > > Finally, I have a question regarding the compatibility with the new > > > async state access patterns. AFAICT, the PTF proposal should work > > > nicely with these new APIs. Is that assessment correct? > > > > I haven't spend much throught on it. But the interfaces should be > > generic enough. If not we can add specialized classes that can be used > > next to @StateHint and come up with methods for that. > > > > In general, this whole effort is an iterative feature. We won't be able > > to consider all capabilities of DataStream API on day one. And this is > > also not the goal. We don't need to reach 1:1 feature parity with > > DataStream API but rather power 80% of the use cases and roadblocks that > > SQL users face. > > > > Thanks, > > Timo > > > > > > On 07.11.24 16:23, David Radley wrote: > >> Hi Timo, > >> Thank you for the well structured and referenced Flip. This looks > >> really useful. > >> > >> Some thoughts: > >> > >> > >> 1. I wonder what class of scenarios that PTF’s do not do well in > >> and that DataStream API would be preferable/required? Assuming we have > >> PTF Map and List support. > >> 2. I see > >> /** > >> * Registers a timer to be fired when the event time watermark > >> passes the given time. > >> * Replaces an existing timer under the same name. > >> */ > >> void registerOnTime(String name, T time); > >> > >> Does this mean: > >> “Registers a timer. After that the time interval, the > >> watermark is progressed to that time by the framework and the timer is > >> fired.“ > >> > >> > >> 1. When it says “A PTF can still make progress in time without an > >> explicit on_time attribute”, how would it do this? Is business as > >> usual with the watermark generator or is there also something else the > >> PTF can do? > >> 2. When it says “in case of onWatermark the output is rowtime = > >> TimeContext.followingWatermark() - 1” . It would be good to state in > >> the Flip the thinking behind this. > >> 3. I agree with Fabian the currentWatermark and followingWatermark > >> is confusing. > >> 4. On void registerOnPeriodicTime(String name, T relativeTime, > >> Duration period); what is relativeTime? relativeTime sounds like a > >> duration. > >> > >> Kind regards, David. > >> > >> From: Fabian Hüske <fhue...@confluent.io.INVALID> > >> Date: Monday, 4 November 2024 at 15:41 > >> To: dev@flink.apache.org <dev@flink.apache.org> > >> Subject: [EXTERNAL] Re: [DISCUSS] FLIP-440: User-defined SQL operators > >> / ProcessTableFunction (PTF) > >> Hi Timo, > >> > >> Thanks for the detailed and very well structured FLIP document! > >> This is an important feature and will enable many more use-cases for > >> Flink > >> SQL and Table API. > >> > >> I have a few questions / suggestions: > >> > >> 1. "Scoping and Simplifications", "Partition and Order Semantics": > >> "By default, we require a partitioning for tables with set > semantics." > >> What does "by default" mean? Will there be a case where we won't require > >> partitioning? What's the non-default case? > >> > >> 2. "Public Interfaces", "ProcessTableFunction": > >> 2.1 Can the `ProcessTableFunction.getKind()` method be `final`? > >> 2.2 "Public Interfaces", "ProcessTableFunction", `OnTimerContext`: > >> Should we also expose the timestamp for which the timer was set? > >> 2.3 (minor) "As" in `Context.getTimeContextAs(Class<T>)` implies IMO > >> that the method returns an instance of T. Maybe rename to > >> `getTimeContext(Class<T>)` or `getTimeContextFor(Class<T>)` > >> > >> 3. "Public Interfaces", "TimeContext": > >> 3.1 `currentTime()`: this method name is a bit confusing IMO. I would > >> say, the "current time" is the WM time, because the WM drives the global > >> event-time clock of the query. I think it would be good, to make it > clear > >> that the method returns the event-time column of the current row. > >> 3.2 `currentWatermark()`: should we rather emit `0` when not WM has > >> been > >> received yet? > >> 3.3 `followingWatermark()`: TBH, I find this name quite confusing > >> and it > >> took me a while until I understood its purpose. If I understand > >> correctly, > >> it makes only sense to call it in the `onTimer()` method because only > >> there, we know the next watermark. Would it be possible to rename it to > >> `triggeringWatermark()` and ensure that it is only callable in > >> `onTimer()` > >> and not in `eval()` and `finish()`? > >> 3.4 Are the methods to register periodic timers necessary? How > >> would they > >> be used? How much do we help the user? What becomes easier / less > >> boilerplate code? How much do we confuse them with additional API > >> surface? > >> > >> 4. "Public Interfaces", "TableSemantics": > >> 4.1 rename to `TableArgSemantics`? > >> 4.2 Why are the partitionBy and orderBy column indexes in > >> two-dimensional > >> arrays? Wouldn't simple arrays be sufficient? Using indexes also means > >> that > >> we don't allow partitionBy/orderBy expressions, right? There's a simple > >> work around using CTEs,views or nested queries, but just want to > >> mention it. > >> 4.3 Why are the copartition args not indexes but names? > >> 4.4 rename methods `xxxColumns()` -> `xxxColumnIdxs()`? > >> > >> 5. "Public Interfaces", "TypeInference": > >> 5.1 Could `getStaticArguments()` return an empty list instead of > >> Optional<List<StaticArgument>>? > >> > >> 6. "Public Interfaces", "FunctionDefinition": > >> 6.1 `getInputChangelogMode()` do we need to be able to specify > >> different > >> requirements if a PTF supports multiple inputs? > >> 6.2 `getOutputChangelogMode()` doesn't the mode depend on the input > >> changelog mode? Wouldn't we need to pass this information into the > >> method? > >> > >> 7. "Proposed Changes", "Implementation Details", "Time Semantics" > >> 7.1 "Watermark by default": How is the described behavior different > >> from > >> the standard behavior? All timers fire on watermarks. The only > difference > >> is that we cannot set timers to row timestamps but that depends on the > >> developer of the function and not on the framework. I don't disagree > with > >> that behavior, I'm just not sure if this needs to be mentioned at all. > >> 7.2 "Output Timestamp": I'm not sure if the output timestamp behavior > >> results in limitations on how PTFs can be used. This is a power-user API > >> and it might make sense to allow developers to chose what timestamp to > >> emit. They should of course make sure that the timestamp is not less > than > >> the current watermark. For example our implementation of OVER aggregates > >> could not be implemented without setting explicit timestamps because we > >> only collect rows in `eval()` and emit them in `onTimer()` but > >> preserve the > >> per-row timestamp. > >> > >> 8. "Proposed Changes", "Implementation Details", "Query Evolution" > >> 8.1 I agree that the uid should be optional. However, why not use a > >> generated name (PTF-name + count suffix, UUID) if it isn't set? Then we > >> don't need to block multiple invocations per query which might not be > >> uncommon (UNION ALL multiple PTF branches). Sure, users could just set > >> the > >> uid but do we want to fail a query for that? > >> > >> Finally, I found a few minor mistakes that you might want to fix: > >> > >> 1. "Public Interfaces", "ProcessTableFunction", "The collector is > >> globally > >> available for both eval() , finish(), and onTimer()". Usage of "both" > >> although there are three items listed. > >> 2. "Public Interfaces", "TypeInference": Should the first `table()` > >> method > >> use `EnumSet<ArgTrait> traits` instead of `EnumSet<StaticArgumentTraits> > >> traits`? > >> 3. "Proposed Changes", "Implementation Details", "Query Evolution": > >> Wouldn't the side-output example fail due to the same uids? > >> > >> Finally, I have a question regarding the compatibility with the new > async > >> state access patterns. AFAICT, the PTF proposal should work nicely with > >> these new APIs. Is that assessment correct? > >> > >> Thanks, Fabian > >> > >> On Mon, Nov 4, 2024 at 2:41 PM Timo Walther <twal...@apache.org> wrote: > >> > >>> Hi David, Hi Shengkai, > >>> > >>> > can I apply a PTF to a stream that doesn't have a time attribute? > >>> > >>> Yes, time attributes are optional. This is why the > >>> REQUIRES_TIME_ATTRIBUTE argument trait exists. If no on_time has been > >>> specified in the SQL call and the REQUIRES_TIME_ATTRIBUTE trait is not > >>> present, timers will fire on watermark by default. If there is no > >>> watermark present, the timers will not fire - similar to DataStream > API. > >>> For built-in functions I suggest that we set the > REQUIRES_TIME_ATTRIBUTE > >>> to return early errors. > >>> > >>> > is @StateHint CountState state referring to the class named Count? > >>> > This appears to be a typo. > >>> > >>> Yes it was a typo. Fixed. Thanks! > >>> > >>> > 1. How do users register PTF to Flink? It looks like users can > >>> > use CREATE FUNCTION to register the PTF. > >>> > >>> PTFs behave like scalar or table functions in this regard. You can use > >>> CREATE FUNCTION, TableEnvironment.createFunction, or inline in Table > API > >>> using call(PTFClass.class). No special case. > >>> > >>> > 2. Can the input parameter of PTF be a view? If the input parameter > >>> is > a table, how can the developer of PTF know the schema of the input > >>> > table? Without type information, I don't know how to extract fields > >>> > from the Row or RowData. > >>> > >>> Yes, the input parameter can be a table, view or CTE. In theory also a > >>> subquery should work but that might need more work in the Calcite > >>> parser/validator. > >>> > >>> Regarding type information, this is good feedback. I added the type > >>> information to the Context. Under > >>> Context.tableSemanticsFor(arg).getDataType. Built-in functions won't > >>> need this as they can also access the type via getTypeInference in > >>> CallContext. > >>> > >>> > 3. I see the API change mentioned that `getInputChangelogMode` and > >>> > `getOutputChangelogMode` will be added to FunctionDefinition? Can > we > >>> > just add these two methods to ProcessTableFunction? After all, > other > >>> > functions(e.g. scalar function) don't need these two methods. > >>> > >>> Unfortunately, this is not possible. As mentioned in the FLIP, > >>> "Following the specification of FunctionDefinition: Instances of this > >>> class provide all details necessary to validate a function call and > >>> perform planning.". So everything for planning should be present in > >>> FunctionDefinition. For ScalarFunction and other UDFs we can make these > >>> methods `final`, so it won't be possible to change the append-only. > >>> > >>> > 4. If an insert-only stream is converted to a changelog stream by > >>> the > >>> > PTF, what is the upsert key of the changelog stream? > >>> > >>> Upsert keys won't be considered because only retract mode will be > >>> supported. > >>> > >>> > 5. I see that a new type, DESCRIPTOR, has been added. Can the user > >>> > declare a type as DESCRIPTOR directly in the DDL, or is the type > >>> only > >>> > available to PTFs? If it is only used for PTFs, do we need to > >>> provide > >>> > the DataTypes#DESCRIPTOR() method to allow the user to declare the > >>> > type? Or is it just a type for internal use like DistinctType. > Also, > >>> > can you add the conversion relationship between this type and > >>> > other types? > >>> > >>> DESCRIPTOR is a logical type similar to SymbolType. In order to define > >>> input signatures, I thought it makes sense to also expose > >>> DataTypes#DESCRIPTOR() method. It is a bit more public and more useful > >>> than SymbolType. Similar to INTERVAL it should not be used in DDL. It > >>> cannot be casted to other types. I updated the section in the FLIP with > >>> more information. > >>> > >>> Best, > >>> Timo > >>> > >>> > >>> > >>> > >>> > >>> On 03.11.24 07:12, Shengkai Fang wrote: > >>>> Hi, Timo. > >>>> > >>>> Thanks for your proposal. This FLIP greatly extends the ease of use of > >>> SQL! > >>>> But I have some questions about this FLIP: > >>>> > >>>> 1. How do users register PTF to Flink? It looks like users can use > >>>> CREATE > >>>> FUNCTION to register the PTF. > >>>> > >>>> 2. Can the input parameter of PTF be a view? If the input parameter > >>>> is a > >>>> table, how can the developer of PTF know the schema of the input > table? > >>>> Without type information, I don't know how to extract fields from > >>>> the Row > >>>> or RowData. > >>>> > >>>> 3. I see the API change mentioned that `getInputChangelogMode` and > >>>> `getOutputChangelogMode` will be added to FunctionDefinition? Can we > >>>> just > >>>> add these two methods to ProcessTableFunction? After all, other > >>>> functions(e.g. scalar function) don't need these two methods. > >>>> > >>>> 4. If an insert-only stream is converted to a changelog stream by the > >>> PTF, > >>>> what is the upsert key of the changelog stream? > >>>> > >>>> 5. I see that a new type, DESCRIPTOR, has been added. Can the user > >>> declare > >>>> a type as DESCRIPTOR directly in the DDL, or is the type only > available > >>> to > >>>> PTFs? If it is only used for PTFs, do we need to provide the > >>>> DataTypes#DESCRIPTOR() method to allow the user to declare the type? > Or > >>> is > >>>> it just a type for internal use like DistinctType. Also, can you add > >>>> the > >>>> conversion relationship between this type and other types? > >>>> > >>>> Best, > >>>> Shengkai > >>>> > >>>> David Anderson <da...@alpinegizmo.com> 于2024年11月2日周六 00:03写道: > >>>> > >>>>> Timo, thanks for the response. I have a few more questions. > >>>>> > >>>>> as mentioned in "Scoping and Simplifications" a PTF will not support > >>>>>> late events. It will filter them out. We have to solve the late > >>>>>> events > >>>>>> topic at an earlier stage in the SQL pipeline. This is a different > >>>>>> FLIP > >>>>>> discussion. Not every SQL operator should deal with late events in a > >>>>>> different way. > >>>>> > >>>>> > >>>>> So long as we can someday cleanly handle late events, I'm okay with > >>> this. > >>>>> > >>>>> Some followup questions: can I apply a PTF to a stream that doesn't > >>> have a > >>>>> time attribute? The section on time and watermarks seems to allow for > >>> this, > >>>>> but it also seems to expect that watermarks are present, regardless. > >>> What > >>>>> if they aren't? I'm wondering if there's a case where there are no > >>>>> watermarks, and a PTF registers a timer that can never fire. > >>>>> > >>>>> And one unrelated question: > >>>>> > >>>>> In the CountWithTimeout example, is @StateHint CountState state > >>> referring > >>>>> to the class named Count? This appears to be a typo. > >>>>> > >>>>> David > >>>>> > >>>>> On Fri, Nov 1, 2024 at 3:03 PM Timo Walther <twal...@apache.org> > >>>>> wrote: > >>>>> > >>>>>> Hi David, > >>>>>> > >>>>>> as mentioned in "Scoping and Simplifications" a PTF will not support > >>>>>> late events. It will filter them out. We have to solve the late > >>>>>> events > >>>>>> topic at an earlier stage in the SQL pipeline. This is a different > >>>>>> FLIP > >>>>>> discussion. Not every SQL operators should deal with late events in > a > >>>>>> different way. > >>>>>> > >>>>>> > Is there a guarantee that watermarking will be applied > >>>>>> upstream of > >>>>>> the split between the two statements in the resulting job graph? > >>>>>> > >>>>>> Yes, the uid will make the PTF unique in the entire job graph. If > >>>>>> this > >>>>>> is not possible (e.g. because the two PTF invocations are used in > >>>>>> completely different statements and a common subgraph cannot be > >>>>>> determined), we can throw an errror. > >>>>>> > >>>>>> Regards, > >>>>>> Timo > >>>>>> > >>>>>> > >>>>>> On 01.11.24 12:13, David Anderson wrote: > >>>>>>>> 3. Change of interfaces for multiple output tables > >>>>>>>> Currently, I think using a STATEMENT SET should be enough for side > >>>>>>>> output semantics. I have added an example in section 5.2.3.2 for > >>> that. > >>>>>>> > >>>>>>> I question whether this really works. Is there a guarantee that > >>>>>>> watermarking will be applied upstream of the split between the two > >>>>>>> statements in the resulting job graph? Otherwise, important use > >>>>>>> cases > >>>>>> like > >>>>>>> sending late events to a side output will behave > >>> non-deterministically, > >>>>>> and > >>>>>>> be useless. > >>>>>>> > >>>>>>> David > >>>>>>> > >>>>>>> > >>>>>>> On Fri, Nov 1, 2024 at 10:26 AM Timo Walther <twal...@apache.org> > >>>>> wrote: > >>>>>>> > >>>>>>>> Hi Xuyang, > >>>>>>>> > >>>>>>>> thanks for the good questions. > >>>>>>>> > >>>>>>>> 1. What happens if the TTLs for these different StateHints are not > >>> the > >>>>>>>> same? > >>>>>>>> > >>>>>>>> The eval() fully determines available state and their TTL. Helper > >>>>>>>> methods such as onTimer() and finish() can references a subset of > >>>>>>>> declared state. It is not necessary that the helper methods > declare > >>>>> all > >>>>>>>> state properties one more time. The name should be sufficient > >>>>>>>> and we > >>>>>>>> should forbid setting additional properties. > >>>>>>>> > >>>>>>>> 2. I believe the named arguments introduced in FLIP-387[1] can > also > >>> be > >>>>>>>> applied to this ProcessTableFunction, right? > >>>>>>>> > >>>>>>>> Absolutely, the PTF actually needs named arguments. Esp for > >>>>>>>> optional > >>>>>>>> fields such uid or on_time. For forward compatibility, I would > even > >>>>>>>> suggest that PTFs only support named arguments. Not sure if we can > >>>>>>>> enforce that. > >>>>>>>> > >>>>>>>> 3. Will we expose the original RowKind in the eval method's row > >>> input? > >>>>>>>> > >>>>>>>> Yes, it's likely that only advanced users will take use of that. > In > >>>>> that > >>>>>>>> case users have to work with Row/RowData. It's likely that rather > >>>>>>>> build-in functions will make use of this. The default changelog > >>>>>>>> mode > >>>>> for > >>>>>>>> both input and output is append. > >>>>>>>> > >>>>>>>> 4. Are we allowing users to define both styles simultaneously > >>>>>>>> > >>>>>>>> Yes. Context is optional. And state access in helper methods > >>>>>>>> (finish/onTimer) as well. This reduces the overhead in case a PTF > >>> runs > >>>>>>>> in a container/other process. > >>>>>>>> > >>>>>>>> I will update the FLIP to reflect these answers. > >>>>>>>> > >>>>>>>> Thanks, > >>>>>>>> Timo > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> On 01.11.24 05:10, Xuyang wrote: > >>>>>>>>> Hi, Timo. > >>>>>>>>> > >>>>>>>>> Thank you for this great work! When I previously introduced the > >>>>> session > >>>>>>>> window TVF, I was contemplating > >>>>>>>>> > >>>>>>>>> how to enable users to define a PTF in SQL. I'm glad to see this > >>> work > >>>>>>>> being discussed and that it has > >>>>>>>>> > >>>>>>>>> improved the integration with the DataStream API. > >>>>>>>>> > >>>>>>>>> After reading the entire flip, I have a few questions that I hope > >>> you > >>>>>>>> can address. > >>>>>>>>> > >>>>>>>>> 1. I noticed that in the example, the same field (e.g., > >>>>>>>>> CountState) > >>>>> can > >>>>>>>> declare a StateHint in the eval, onTimer, > >>>>>>>>> > >>>>>>>>> and finish methods. What happens if the TTLs for these different > >>>>>>>> StateHints are not the same? > >>>>>>>>> > >>>>>>>>> 2. I believe the named arguments introduced in FLIP-387[1] can > >>>>>>>>> also > >>>>> be > >>>>>>>> applied to this ProcessTableFunction, right? > >>>>>>>>> > >>>>>>>>> 3. In our UDAFs, we expect users to provide accumulate and > retract > >>>>>>>> methods to handle input data for +I/+U and -U/-D. > >>>>>>>>> > >>>>>>>>> However, in the eval method of a ScalarFunction/UDTF, users do > not > >>>>> have > >>>>>>>> visibility into the input's RowKind. In the new PTF, > >>>>>>>>> > >>>>>>>>> will we expose the original RowKind in the eval method's row > >>>>>>>>> input, > >>>>>>>> allowing users to determine the row's RowKind themselves? > >>>>>>>>> > >>>>>>>>> 4. I noticed that in the examples, the eval method sometimes > >>> includes > >>>>>>>> the Context, @StateHint fields, and the input data (Row > >>>>>>>>> > >>>>>>>>> input), while other times it only consists of the input data. > >>>>>>>>> Are we > >>>>>>>> allowing users to define both styles simultaneously? > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> [1] > >>>>>>>> > >>>>>> > >>>>> > >>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-387%3A+Support+named+parameters+for+functions+and+call+procedures > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -- > >>>>>>>>> > >>>>>>>>> Best! > >>>>>>>>> Xuyang > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> At 2024-10-31 21:57:37, "Timo Walther" <twal...@apache.org> > wrote: > >>>>>>>>>> Hi everyone, > >>>>>>>>>> > >>>>>>>>>> thanks for all the feedback I received so far. I had very > healthy > >>>>>>>>>> discussions with various people both online and offline at > >>>>>>>>>> Current > >>>>> and > >>>>>>>>>> Flink Forward Berlin. The general user responses were also very > >>>>>>>>>> positive. The FLIP should be ready to start a VOTE thread. > >>>>>>>>>> > >>>>>>>>>> This is the last call for feedback. I would start a VOTE > tomorrow > >>> if > >>>>>>>>>> there are no objections. Happy to take further feedback during > >>>>>>>>>> implementation as well. > >>>>>>>>>> > >>>>>>>>>> Thanks, > >>>>>>>>>> Timo > >>>>>>>>>> > >>>>>>>>>> On 30.10.24 14:34, Timo Walther wrote: > >>>>>>>>>>> Hi Jim, > >>>>>>>>>>> > >>>>>>>>>>> 3. Multiple output tables > >>>>>>>>>>> > >>>>>>>>>>> > Does the target_table need to be specified in the SELECT > >>>>> clause? > >>>>>>>>>>> > >>>>>>>>>>> No. Similar to reading from a regular table. The filter column > >>> must > >>>>>> not > >>>>>>>>>>> be part of SELECT part. > >>>>>>>>>>> > >>>>>>>>>>> > It seems like the two target_table could have separate > >>> schemas > >>>>>>>> defined. > >>>>>>>>>>> > >>>>>>>>>>> That is true. The SELECT is responsible to transforms the > >>>>>>>>>>> columns > >>>>>> into > >>>>>>>>>>> the target table's schema. The output row of the PTF might be a > >>>>> union > >>>>>>>> of > >>>>>>>>>>> various columns in this case. > >>>>>>>>>>> > >>>>>>>>>>> 10. Support for State TTL > >>>>>>>>>>> > >>>>>>>>>>> > I'd be strongly in favor of doing any interface / base > >>>>>>>>>>> work > >>> we > >>>>>>>> need in > >>>>>>>>>>> > the initial implementation so that state size can be > >>> managed. > >>>>>>>>>>> > >>>>>>>>>>> I agree, State TTL is crucial. I updated the FLIP and added > >>>>>> interfaces > >>>>>>>>>>> to StateTypeStrategy and @StateHint. > >>>>>>>>>>> > >>>>>>>>>>> Cheers, > >>>>>>>>>>> Timo > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On 23.10.24 17:59, Jim Hughes wrote: > >>>>>>>>>>>> Hi Timo, > >>>>>>>>>>>> > >>>>>>>>>>>> Thank you for the answers. I have a few clarifications > >>>>>>>>>>>> inlined. > >>>>>>>>>>>> > >>>>>>>>>>>> On Mon, Oct 14, 2024 at 8:07 AM Timo Walther > >>>>>>>>>>>> <twal...@apache.org > >>>> > >>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> 3. Change of interfaces for multiple output tables > >>>>>>>>>>>>> Currently, I think using a STATEMENT SET should be enough for > >>>>> side > >>>>>>>>>>>>> output semantics. I have added an example in section > >>>>>>>>>>>>> 5.2.3.2 for > >>>>>>>> that. > >>>>>>>>>>>>> We are still free to add more methods to Context, let the > >>>>> function > >>>>>>>>>>>>> implement additional interfaces or use more code generation > >>>>>> together > >>>>>>>>>>>>> with @ArgumentHints. > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> Does the target_table need to be specified in the SELECT > >>>>>>>>>>>> clause? > >>>>> Or > >>>>>>>>>>>> could > >>>>>>>>>>>> it read > >>>>>>>>>>>> > >>>>>>>>>>>> EXECUTE STATEMENT SET BEGIN > >>>>>>>>>>>> INSERT INTO main SELECT a, b FROM > >>>>>> FunctionWithSideOutput(input => > >>>>>>>>>>>> data, > >>>>>>>>>>>> uid = 'only_once') WHERE target_table = 'main'; > >>>>>>>>>>>> INSERT INTO side SELECT a, b FROM > >>>>>> FunctionWithSideOutput(input => > >>>>>>>>>>>> data, > >>>>>>>>>>>> uid = 'only_once') WHERE target_table = 'side'; > >>>>>>>>>>>> END; > >>>>>>>>>>>> > >>>>>>>>>>>> Separately, for clarity, it seems like the two target_table > >>>>>>>>>>>> could > >>>>>> have > >>>>>>>>>>>> separate schemas defined. > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>>> 10. Support for State TTL > >>>>>>>>>>>>> Supporting state TTL will be easy. We just need to add a > >>>>> parameter > >>>>>> to > >>>>>>>>>>>>> @StateHint and pass it through. > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> If PTFs can have state, I'd be strongly in favor of doing any > >>>>>>>> interface / > >>>>>>>>>>>> base work we need in the initial implementation so that state > >>> size > >>>>>>>> can be > >>>>>>>>>>>> managed. If it is just sufficient to have hints in the > >>> interface, > >>>>>>>>>>>> awesome! > >>>>>>>>>>>> > >>>>>>>>>>>> Cheers, > >>>>>>>>>>>> > >>>>>>>>>>>> Jim > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>>> > >>>>> > >>>> > >>> > >>> > >> > >> Unless otherwise stated above: > >> > >> IBM United Kingdom Limited > >> Registered in England and Wales with number 741598 > >> Registered office: Building C, IBM Hursley Office, Hursley Park Road, > >> Winchester, Hampshire SO21 2JN > > > >