Thanks Piotr and Timo for your responses. To address your comments Timo:
1) Configuration Configuration keys like `table.exec.async-scalar.catalog.db.func-name.buffer-capacity` are currently not supported in the configuration stack. The key space > should remain constant. Only a constant key space enables the use of the > ConfigOption class which is required in the layered configuration. For > now I would suggest to only allow a global setting for buffer capacity, > timeout, and retry-strategy. We can later work on a per-function > configuration (potentially also needed for other use cases). Yeah, I was trying to find similar examples and couldn't find too many, because as you say, they aren't supported. There are things like the metric reporters <https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/metric_reporters/>, where you can make up a name (e.g. my_jmx_reporter), but then must list it in metrics.reporters so that configs can all be iterated over. Doing a similar thing here would be a bit inelegant for this case. I'm happy to have a global setting and a future solution could override the global setting once we figure that out. 2) Semantical declaration Regarding `table.exec.async-scalar.catalog.db.func-name.output-mode` this is a semantical property of a function and should be defined > per-function. It impacts the query result and potentially the behavior > of planner rules. I see two options for this either: (a) an additional method in > AsyncScalarFunction or (b) adding this to the function's requirements. I > vote for (b), because a FunctionDefinition should be fully self > contained and sufficient for planning. Thus, for `FunctionDefinition.getRequirements(): > Set<FunctionRequirement>` we can add a new requirement `ORDERED` which > should also be the default for AsyncScalarFunction. `getRequirements()` > can be overwritten and return a set without this requirement if the user > intents to do this. That's a good point. Maybe if we had per-function configs this could make sense, but it doesn't make as much when everything is global. The default of each definition should be to get a correct result, but allowing a manual override to say that performance is ultimately what they care about over certain SQL order semantics is also useful. If the user overrides `requirements()` to omit the `ORDERED` requirement, do we allow the operator to return out-of-order results or should it fall back on `AsyncOutputMode.ALLOW_UNORDERED` type behavior (where we allow out-of-order only if it's deemed correct)? Having a manual override to mean out-of-order seems like a decent starting point and we could alway add `FunctionRequirement.ALLOW_UNDERED` in the future to allow the more sophisticated behavior. Thanks, Alan On Thu, Dec 14, 2023 at 1:29 AM Timo Walther <twal...@apache.org> wrote: > Hi Alan, > > thanks for proposing this FLIP. It's a great addition to Flink and has > been requested multiple times. It will be in particular interesting for > accessing REST endpoints and other remote services. > > Great that we can generalize and reuse parts of the Python planner rules > and code for this. > > I have some feedback regarding the API: > > 1) Configuration > > Configuration keys like > > `table.exec.async-scalar.catalog.db.func-name.buffer-capacity` > > are currently not supported in the configuration stack. The key space > should remain constant. Only a constant key space enables the use of the > ConfigOption class which is required in the layered configuration. For > now I would suggest to only allow a global setting for buffer capacity, > timeout, and retry-strategy. We can later work on a per-function > configuration (potentially also needed for other use cases). > > 2) Semantical declaration > > Regarding > > `table.exec.async-scalar.catalog.db.func-name.output-mode` > > this is a semantical property of a function and should be defined > per-function. It impacts the query result and potentially the behavior > of planner rules. > > I see two options for this either: (a) an additional method in > AsyncScalarFunction or (b) adding this to the function's requirements. I > vote for (b), because a FunctionDefinition should be fully self > contained and sufficient for planning. > > Thus, for `FunctionDefinition.getRequirements(): > Set<FunctionRequirement>` we can add a new requirement `ORDERED` which > should also be the default for AsyncScalarFunction. `getRequirements()` > can be overwritten and return a set without this requirement if the user > intents to do this. > > > Thanks, > Timo > > > > > On 11.12.23 18:43, Piotr Nowojski wrote: > > +1 to the idea, I don't have any comments. > > > > Best, > > Piotrek > > > > czw., 7 gru 2023 o 07:15 Alan Sheinberg <asheinb...@confluent.io > .invalid> > > napisał(a): > > > >>> > >>> Nicely written and makes sense. The only feedback I have is around the > >>> naming of the generalization, e.g. "Specifically, > PythonCalcSplitRuleBase > >>> will be generalized into RemoteCalcSplitRuleBase." This naming seems > to > >>> imply/suggest that all Async functions are remote. I wonder if we can > >> find > >>> another name which doesn't carry that connotation; maybe > >>> AsyncCalcSplitRuleBase. (An AsyncCalcSplitRuleBase which handles > Python > >>> and Async functions seems reasonable.) > >>> > >> Thanks. That's fair. I agree that "Remote" isn't always accurate. I > >> believe that the python calls are also done asynchronously, so that > might > >> be a reasonable name, so long as there's no confusion between the base > and > >> async child class. > >> > >> On Wed, Dec 6, 2023 at 3:48 PM Jim Hughes <jhug...@confluent.io.invalid > > > >> wrote: > >> > >>> Hi Alan, > >>> > >>> Nicely written and makes sense. The only feedback I have is around the > >>> naming of the generalization, e.g. "Specifically, > PythonCalcSplitRuleBase > >>> will be generalized into RemoteCalcSplitRuleBase." This naming seems > to > >>> imply/suggest that all Async functions are remote. I wonder if we can > >> find > >>> another name which doesn't carry that connotation; maybe > >>> AsyncCalcSplitRuleBase. (An AsyncCalcSplitRuleBase which handles > Python > >>> and Async functions seems reasonable.) > >>> > >>> Cheers, > >>> > >>> Jim > >>> > >>> On Wed, Dec 6, 2023 at 5:45 PM Alan Sheinberg > >>> <asheinb...@confluent.io.invalid> wrote: > >>> > >>>> I'd like to start a discussion of FLIP-400: AsyncScalarFunction for > >>>> asynchronous scalar function support [1] > >>>> > >>>> This feature proposes adding a new UDF type AsyncScalarFunction which > >> is > >>>> invoked just like a normal ScalarFunction, but is implemented with an > >>>> asynchronous eval method. I had brought this up including the > >> motivation > >>>> in a previous discussion thread [2]. > >>>> > >>>> The purpose is to achieve high throughput scalar function UDFs while > >>>> allowing that an individual call may have high latency. It allows > >>> scaling > >>>> up the parallelism of just these calls without having to increase the > >>>> parallelism of the whole query (which could be rather resource > >>>> inefficient). > >>>> > >>>> In practice, it should enable SQL integration with external services > >> and > >>>> systems, which Flink has limited support for at the moment. It should > >>> also > >>>> allow easier integration with existing libraries which use > asynchronous > >>>> APIs. > >>>> > >>>> Looking forward to your feedback and suggestions. > >>>> > >>>> [1] > >>>> > >>>> > >>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support > >>>> < > >>>> > >>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support > >>>>> > >>>> > >>>> [2] https://lists.apache.org/thread/bn153gmcobr41x2nwgodvmltlk810hzs > >>>> <https://lists.apache.org/thread/bn153gmcobr41x2nwgodvmltlk810hzs> > >>>> > >>>> Thanks, > >>>> Alan > >>>> > >>> > >> > > > >