1. Override the function `getRequirements` in `AsyncScalarFunction`

> If the user overrides `requirements()` to omit the `ORDERED`
> requirement, do we allow the operator to return out-of-order results
> or should it fall back on `AsyncOutputMode.ALLOW_UNORDERED` type
> behavior (where we allow out-of-order only if it's deemed correct)?

You are right. Actually it should be the planner that fully decides whether ORDERED or UNORDERED is safe to do. For example, if the query is an append-only `SELECT FUNC(c) FROM t`, I don't see a reason why the operator is not allowed to produce unordered results. By global configuration, we can set ORDERED such that users don't get confused about the unordered output. The mode UNORDERED however should only have effect for these simply use cases and throw an exception if UNORDERED would mess up a changelog or other subsequent operators.

2. In some scenarios with semantic correctness, async operators must be executed in sync mode.

> What about throwing an exception to make it clear to users that using async scalar functions

@Xuyang: A regular SQL user doesn't care whether the function is sync or async. The planner should simply give its best to make the execution performant. I would not throw an exception here. There more exceptions the, the more struggles and questions from the user. Conceptually, we can run async code also sync, and that's why we should also do it to avoid errors.

3. Hints

@Aitozi: Let's go with global configuration first and later introduce hints. I feel the more hints we introduce, the harder SQL queries get when maintaining them.

Regards,
Timo




On 15.12.23 04:51, Xuyang wrote:
Hi, Alan. Thanks for driving this.


Using async to improve throughput has been done on look join, and the 
improvement
effect is obvious, so I think it makes sense to support async scalar function.  
Big +1 for this flip.
I have some questions below.


1. Override the function `getRequirements` in `AsyncScalarFunction`


I’m just curious why you don’t use conf(global) and query hint(individual async 
udx) to mark the output
mode 'order' or 'unorder' like async look join [1] and async udtf[2], but chose 
to introduce a new enum
in AsyncScalarFunction.


2. In some scenarios with semantic correctness, async operators must be 
executed in sync mode.


What about throwing an exception to make it clear to users that using async 
scalar functions in this situation
is problematic instead of executing silently in sync mode? Because users may be 
confused about
the final actual job graph.


[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-234%3A+Support+Retryable+Lookup+Join+To+Solve+Delayed+Updates+Issue+In+External+Systems
[2] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction









--

     Best!
     Xuyang





在 2023-12-15 11:20:24,"Aitozi" <gjying1...@gmail.com> 写道:
Hi Alan,
    Nice FLIP, I also explore leveraging the async table function[1] to
improve the throughput before.

About the configs, what do you think using hints as mentioned in [1].

[1]:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction

Thanks,
Aitozi.

Timo Walther <twal...@apache.org> 于2023年12月14日周四 17:29写道:

Hi Alan,

thanks for proposing this FLIP. It's a great addition to Flink and has
been requested multiple times. It will be in particular interesting for
accessing REST endpoints and other remote services.

Great that we can generalize and reuse parts of the Python planner rules
and code for this.

I have some feedback regarding the API:

1) Configuration

Configuration keys like

`table.exec.async-scalar.catalog.db.func-name.buffer-capacity`

are currently not supported in the configuration stack. The key space
should remain constant. Only a constant key space enables the use of the
ConfigOption class which is required in the layered configuration. For
now I would suggest to only allow a global setting for buffer capacity,
timeout, and retry-strategy. We can later work on a per-function
configuration (potentially also needed for other use cases).

2) Semantical declaration

Regarding

`table.exec.async-scalar.catalog.db.func-name.output-mode`

this is a semantical property of a function and should be defined
per-function. It impacts the query result and potentially the behavior
of planner rules.

I see two options for this either: (a) an additional method in
AsyncScalarFunction or (b) adding this to the function's requirements. I
vote for (b), because a FunctionDefinition should be fully self
contained and sufficient for planning.

Thus, for `FunctionDefinition.getRequirements():
Set<FunctionRequirement>` we can add a new requirement `ORDERED` which
should also be the default for AsyncScalarFunction. `getRequirements()`
can be overwritten and return a set without this requirement if the user
intents to do this.


Thanks,
Timo




On 11.12.23 18:43, Piotr Nowojski wrote:
+1 to the idea, I don't have any comments.

Best,
Piotrek

czw., 7 gru 2023 o 07:15 Alan Sheinberg <asheinb...@confluent.io
.invalid>
napisał(a):


Nicely written and makes sense.  The only feedback I have is around the
naming of the generalization, e.g. "Specifically,
PythonCalcSplitRuleBase
will be generalized into RemoteCalcSplitRuleBase."  This naming seems
to
imply/suggest that all Async functions are remote.  I wonder if we can
find
another name which doesn't carry that connotation; maybe
AsyncCalcSplitRuleBase.  (An AsyncCalcSplitRuleBase which handles
Python
and Async functions seems reasonable.)

Thanks.  That's fair.  I agree that "Remote" isn't always accurate.  I
believe that the python calls are also done asynchronously, so that
might
be a reasonable name, so long as there's no confusion between the base
and
async child class.

On Wed, Dec 6, 2023 at 3:48 PM Jim Hughes <jhug...@confluent.io.invalid

wrote:

Hi Alan,

Nicely written and makes sense.  The only feedback I have is around the
naming of the generalization, e.g. "Specifically,
PythonCalcSplitRuleBase
will be generalized into RemoteCalcSplitRuleBase."  This naming seems
to
imply/suggest that all Async functions are remote.  I wonder if we can
find
another name which doesn't carry that connotation; maybe
AsyncCalcSplitRuleBase.  (An AsyncCalcSplitRuleBase which handles
Python
and Async functions seems reasonable.)

Cheers,

Jim

On Wed, Dec 6, 2023 at 5:45 PM Alan Sheinberg
<asheinb...@confluent.io.invalid> wrote:

I'd like to start a discussion of FLIP-400: AsyncScalarFunction for
asynchronous scalar function support [1]

This feature proposes adding a new UDF type AsyncScalarFunction which
is
invoked just like a normal ScalarFunction, but is implemented with an
asynchronous eval method.  I had brought this up including the
motivation
in a previous discussion thread [2].

The purpose is to achieve high throughput scalar function UDFs while
allowing that an individual call may have high latency.  It allows
scaling
up the parallelism of just these calls without having to increase the
parallelism of the whole query (which could be rather resource
inefficient).

In practice, it should enable SQL integration with external services
and
systems, which Flink has limited support for at the moment. It should
also
allow easier integration with existing libraries which use
asynchronous
APIs.

Looking forward to your feedback and suggestions.

[1]




https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support
<



https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support


[2] https://lists.apache.org/thread/bn153gmcobr41x2nwgodvmltlk810hzs
<https://lists.apache.org/thread/bn153gmcobr41x2nwgodvmltlk810hzs>

Thanks,
Alan







Reply via email to