Hi David, Hi Shengkai,

> can I apply a PTF to a stream that doesn't have a time attribute?

Yes, time attributes are optional. This is why the REQUIRES_TIME_ATTRIBUTE argument trait exists. If no on_time has been specified in the SQL call and the REQUIRES_TIME_ATTRIBUTE trait is not present, timers will fire on watermark by default. If there is no watermark present, the timers will not fire - similar to DataStream API. For built-in functions I suggest that we set the REQUIRES_TIME_ATTRIBUTE to return early errors.

> is @StateHint CountState state referring to the class named Count?
> This appears to be a typo.

Yes it was a typo. Fixed. Thanks!

> 1. How do users register PTF to Flink? It looks like users can
> use CREATE FUNCTION to register the PTF.

PTFs behave like scalar or table functions in this regard. You can use CREATE FUNCTION, TableEnvironment.createFunction, or inline in Table API using call(PTFClass.class). No special case.

> 2. Can the input parameter of PTF be a view? If the input parameter is > a table, how can the developer of PTF know the schema of the input
> table? Without type information, I don't know how to extract fields
> from the Row or RowData.

Yes, the input parameter can be a table, view or CTE. In theory also a subquery should work but that might need more work in the Calcite parser/validator.

Regarding type information, this is good feedback. I added the type information to the Context. Under Context.tableSemanticsFor(arg).getDataType. Built-in functions won't need this as they can also access the type via getTypeInference in CallContext.

> 3. I see the API change mentioned that `getInputChangelogMode` and
> `getOutputChangelogMode` will be added to FunctionDefinition? Can we
> just add these two methods to ProcessTableFunction? After all, other
> functions(e.g. scalar function) don't need these two methods.

Unfortunately, this is not possible. As mentioned in the FLIP, "Following the specification of FunctionDefinition: Instances of this class provide all details necessary to validate a function call and perform planning.". So everything for planning should be present in FunctionDefinition. For ScalarFunction and other UDFs we can make these methods `final`, so it won't be possible to change the append-only.

> 4. If an insert-only stream is converted to a changelog stream by the
> PTF, what is the upsert key of the changelog stream?

Upsert keys won't be considered because only retract mode will be supported.

> 5. I see that a new type, DESCRIPTOR, has been added. Can the user
> declare a type as DESCRIPTOR directly in the DDL, or is the type only > available to PTFs? If it is only used for PTFs, do we need to provide > the DataTypes#DESCRIPTOR() method to allow the user to declare the
> type? Or is it just a type for internal use like DistinctType. Also,
> can you add the conversion relationship between this type and
> other types?

DESCRIPTOR is a logical type similar to SymbolType. In order to define input signatures, I thought it makes sense to also expose DataTypes#DESCRIPTOR() method. It is a bit more public and more useful than SymbolType. Similar to INTERVAL it should not be used in DDL. It cannot be casted to other types. I updated the section in the FLIP with more information.

Best,
Timo





On 03.11.24 07:12, Shengkai Fang wrote:
Hi, Timo.

Thanks for your proposal. This FLIP greatly extends the ease of use of SQL!
But I have some questions about this FLIP:

1. How do users register PTF to Flink? It looks like users can use CREATE
FUNCTION to register the PTF.

2. Can the input parameter of PTF be a view? If the input parameter is a
table, how can the developer of PTF know the schema of the input table?
Without type information, I don't know how to extract fields from the Row
or RowData.

3. I see the API change mentioned that `getInputChangelogMode` and
`getOutputChangelogMode` will be added to FunctionDefinition? Can we just
add these two methods to ProcessTableFunction? After all, other
functions(e.g. scalar function) don't need these two methods.

4. If an insert-only stream is converted to a changelog stream by the PTF,
what is the upsert key of the changelog stream?

5. I see that a new type, DESCRIPTOR, has been added. Can the user declare
a type as DESCRIPTOR directly in the DDL, or is the type only available to
PTFs? If it is only used for PTFs, do we need to provide the
DataTypes#DESCRIPTOR() method to allow the user to declare the type? Or is
it just a type for internal use like DistinctType. Also, can you add the
conversion relationship between this type and other types?

Best,
Shengkai

David Anderson <da...@alpinegizmo.com> 于2024年11月2日周六 00:03写道:

Timo, thanks for the response. I have a few more questions.

as mentioned in "Scoping and Simplifications" a PTF will not support
late events. It will filter them out. We have to solve the late events
topic at an earlier stage in the SQL pipeline. This is a different FLIP
discussion. Not every SQL operator should deal with late events in a
different way.


So long as we can someday cleanly handle late events, I'm okay with this.

Some followup questions: can I apply a PTF to a stream that doesn't have a
time attribute? The section on time and watermarks seems to allow for this,
but it also seems to expect that watermarks are present, regardless. What
if they aren't? I'm wondering if there's a case where there are no
watermarks, and a PTF registers a timer that can never fire.

And one unrelated question:

In the CountWithTimeout example, is @StateHint CountState state referring
to the class named Count? This appears to be a typo.

David

On Fri, Nov 1, 2024 at 3:03 PM Timo Walther <twal...@apache.org> wrote:

Hi David,

as mentioned in "Scoping and Simplifications" a PTF will not support
late events. It will filter them out. We have to solve the late events
topic at an earlier stage in the SQL pipeline. This is a different FLIP
discussion. Not every SQL operators should deal with late events in a
different way.

  > Is there a guarantee that watermarking will be applied upstream of
the split between the two statements in the resulting job graph?

Yes, the uid will make the PTF unique in the entire job graph. If this
is not possible (e.g. because the two PTF invocations are used in
completely different statements and a common subgraph cannot be
determined), we can throw an errror.

Regards,
Timo


On 01.11.24 12:13, David Anderson wrote:
3. Change of interfaces for multiple output tables
Currently, I think using a STATEMENT SET should be enough for side
output semantics. I have added an example in section 5.2.3.2 for that.

I question whether this really works. Is there a guarantee that
watermarking will be applied upstream of the split between the two
statements in the resulting job graph? Otherwise, important use cases
like
sending late events to a side output will behave non-deterministically,
and
be useless.

David


On Fri, Nov 1, 2024 at 10:26 AM Timo Walther <twal...@apache.org>
wrote:

Hi Xuyang,

thanks for the good questions.

1. What happens if the TTLs for these different StateHints are not the
same?

The eval() fully determines available state and their TTL. Helper
methods such as onTimer() and finish() can references a subset of
declared state. It is not necessary that the helper methods declare
all
state properties one more time. The name should be sufficient and we
should forbid setting additional properties.

2. I believe the named arguments introduced in FLIP-387[1] can also be
applied to this ProcessTableFunction, right?

Absolutely, the PTF actually needs named arguments. Esp for optional
fields such uid or on_time. For forward compatibility, I would even
suggest that PTFs only support named arguments. Not sure if we can
enforce that.

3. Will we expose the original RowKind in the eval method's row input?

Yes, it's likely that only advanced users will take use of that. In
that
case users have to work with Row/RowData. It's likely that rather
build-in functions will make use of this. The default changelog mode
for
both input and output is append.

4. Are we allowing users to define both styles simultaneously

Yes. Context is optional. And state access in helper methods
(finish/onTimer) as well. This reduces the overhead in case a PTF runs
in a container/other process.

I will update the FLIP to reflect these answers.

Thanks,
Timo



On 01.11.24 05:10, Xuyang wrote:
Hi, Timo.

Thank you for this great work! When I previously introduced the
session
window TVF, I was contemplating

how to enable users to define a PTF in SQL. I'm glad to see this work
being discussed and that it has

improved the integration with the DataStream API.

After reading the entire flip, I have a few questions that I hope you
can address.

1. I noticed that in the example, the same field (e.g., CountState)
can
declare a StateHint in the eval, onTimer,

and finish methods. What happens if the TTLs for these different
StateHints are not the same?

2. I believe the named arguments introduced in FLIP-387[1] can also
be
applied to this ProcessTableFunction, right?

3. In our UDAFs, we expect users to provide accumulate and retract
methods to handle input data for +I/+U and -U/-D.

However, in the eval method of a ScalarFunction/UDTF, users do not
have
visibility into the input's RowKind. In the new PTF,

will we expose the original RowKind in the eval method's row input,
allowing users to determine the row's RowKind themselves?

4. I noticed that in the examples, the eval method sometimes includes
the Context, @StateHint fields, and the input data (Row

input), while other times it only consists of the input data. Are we
allowing users to define both styles simultaneously?




[1]


https://cwiki.apache.org/confluence/display/FLINK/FLIP-387%3A+Support+named+parameters+for+functions+and+call+procedures







--

       Best!
       Xuyang





At 2024-10-31 21:57:37, "Timo Walther" <twal...@apache.org> wrote:
Hi everyone,

thanks for all the feedback I received so far. I had very healthy
discussions with various people both online and offline at Current
and
Flink Forward Berlin. The general user responses were also very
positive. The FLIP should be ready to start a VOTE thread.

This is the last call for feedback. I would start a VOTE tomorrow if
there are no objections. Happy to take further feedback during
implementation as well.

Thanks,
Timo

On 30.10.24 14:34, Timo Walther wrote:
Hi Jim,

3. Multiple output tables

    > Does the target_table need to be specified in the SELECT
clause?

No. Similar to reading from a regular table. The filter column must
not
be part of SELECT part.

    > It seems like the two target_table could have separate schemas
defined.

That is true. The SELECT is responsible to transforms the columns
into
the target table's schema. The output row of the PTF might be a
union
of
various columns in this case.

10. Support for State TTL

    > I'd be strongly in favor of doing any interface / base work we
need in
    > the initial implementation so that state size can be managed.

I agree, State TTL is crucial. I updated the FLIP and added
interfaces
to StateTypeStrategy and @StateHint.

Cheers,
Timo



On 23.10.24 17:59, Jim Hughes wrote:
Hi Timo,

Thank you for the answers.  I have a few clarifications inlined.

On Mon, Oct 14, 2024 at 8:07 AM Timo Walther <twal...@apache.org>
wrote:

3. Change of interfaces for multiple output tables
Currently, I think using a STATEMENT SET should be enough for
side
output semantics. I have added an example in section 5.2.3.2 for
that.
We are still free to add more methods to Context, let the
function
implement additional interfaces or use more code generation
together
with @ArgumentHints.


Does the target_table need to be specified in the SELECT clause?
Or
could
it read

EXECUTE STATEMENT SET BEGIN
       INSERT INTO main SELECT a, b FROM
FunctionWithSideOutput(input =>
data,
uid = 'only_once') WHERE target_table = 'main';
       INSERT INTO side SELECT a, b FROM
FunctionWithSideOutput(input =>
data,
uid = 'only_once') WHERE target_table = 'side';
END;

Separately, for clarity, it seems like the two target_table could
have
separate schemas defined.


10. Support for State TTL
Supporting state TTL will be easy. We just need to add a
parameter
to
@StateHint and pass it through.


If PTFs can have state, I'd be strongly in favor of doing any
interface /
base work we need in the initial implementation so that state size
can be
managed.  If it is just sufficient to have hints in the interface,
awesome!

Cheers,

Jim










Reply via email to