Hi Feng, Krzysztof Sorry for the delayed response.
FLIP-313 seems to be a good solution for the TableFunction use case. It would be nice to also have async support for ScalarFunction, AggregateFunction, and TableAggregateFunction too ideally, though this is a lot of work. ScalarFunction is really what I'm most immediately interested in, since it's such a common and straightforward interface for invoking user functionality, especially for people who are less experienced in SQL. If I wanted to implement some of this functionality, should I jump right in with a FLIP? https://github.com/getindata/flink-http-connector: I looked through this code and it looks like a library for utilizing a lookup join which translates to http requests. For a lot of my use cases I have been testing, I have effectively done exactly this sort of thing, though in a less generic way. Thanks for pointing this out to me. -Alan On Wed, Sep 27, 2023 at 12:18 PM Krzysztof Zarzycki <k.zarzy...@gmail.com> wrote: > Hello Alan, > At my company we implemented an open source Flink HTTP connector, that you > might find interesting. It can be represented as a source table as well > and be used in lookups. Here is the link > https://github.com/getindata/flink-http-connector > > Best > Krzysztof > > > On Thu, Sep 21, 2023 at 7:34 AM Feng Jin <jinfeng1...@gmail.com> wrote: > > > Hi Alan > > > > I believe that supporting asynchronous UDF is a valuable > > feature. Currently, there is a similar FLIP[1] available: > > Can this meet your needs? > > > > [1]. > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction > > > > > > Best, > > Feng > > > > On Thu, Sep 21, 2023 at 1:12 PM Alan Sheinberg > > <asheinb...@confluent.io.invalid> wrote: > > > > > Hi Ron, > > > > > > Thanks for your response. I've answered some of your questions below. > > > > > > I think one solution is to support Mini-Batch Lookup Join by the > > framework > > > > layer, do a RPC call by a batch input row, which can improve > > throughput. > > > > > > > > > Would the idea be to collect a batch and then do a single RPC (or at > > least > > > handle a number of rpcs in a single AsyncLookupFunction call)? That is > > an > > > interesting idea and could simplify things. For our use cases, > > > technically, I can write a AsyncLookupFunction and utilize > > > AsyncWaitOperator using unbatched RPCs and do a Lookup Join without any > > > issue. My hesitation is that I'm afraid that callers will find it > > > unintuitive to join with a table where the underlying RPC is not being > > > modeled in that manner. For example, it could be a text classifier > > > IS_POSITIVE_SENTIMENT(...) where there's no backing table, just > > > computation. > > > > > > how does the remote function help to solve your problem? > > > > > > > > > The problem is pretty open-ended. There are jobs where you want to > join > > > data with some external data source and inject it into your pipeline, > but > > > others might also be offloading some computation to an external system. > > > The external system might be owned by a different party, have different > > > permissions, have different hardware to do a computation (e.g. train a > > > model), or just block for a while. The most intuitive invocation for > > this > > > is just a scalar function in SQL. You just want it to be able to run > at > > a > > > high throughput. > > > > > > Regarding implementing the Remote Function, can you go into more detail > > > > about your idea, how we should support it, and how users should use > it, > > > > from API design to semantic explanation?and > > > > > > > > > The unimplemented option I gave the most thought to is 3). You can > > imagine > > > an AsyncScalarFunction definition and example class like: > > > > > > public class AsyncScalarFunction<T> extends UserDefinedFunction { > > > @Override public final FunctionKind getKind() { > > > return FunctionKind.ASYNC_SCALAR; > > > } > > > @Override public TypeInference getTypeInference(DataTypeFactory > > > typeFactory) { > > > return TypeInferenceExtractor.forAsyncScalarFunction(typeFactory, > > > getClass()); > > > } > > > } > > > > > > class MyScalarFunction extends AsyncScalarFunction<String> { > > > // Eval method with a future to use as a callback, with arbitrary > > > additional arguments > > > public void eval(CompletableFuture<String> result, String input) { > > > // Example which uses an async http client > > > AsyncHttpClient httpClient = new AsyncHttpClient(); > > > // Do the request and then invoke the callback depending on the > > > outcome. > > > Future<HttpResponse> responseFuture = > > httpClient.doPOST(getRequestBody( > > > input)); > > > responseFuture.handle((response, throwable) -> { > > > if (throwable != null) { > > > result.completeExceptionally(throwable); > > > } else { > > > result.complete(response.getBody()); > > > } > > > }); > > > } > > > ... > > > } > > > > > > Then you can register it in your Flink program as with other UDFs and > > call > > > it: > > > tEnv.createTemporarySystemFunction("MY_FUNCTION", > > MyScalarFunction.class); > > > TableResult result = tEnv.executeSql("SELECT MY_FUNCTION(input) FROM > > > (SELECT i.input from Inputs i ORDER BY i.timestamp)"); > > > > > > I know there are questions about SQL semantics to consider. For > example, > > > does invocation of MY_FUNCTION preserve the order of the subquery > above. > > > To be SQL compliant, I believe it must, so any async request we send > out > > > must be output in order, regardless of when they complete. There are > > > probably other considerations as well. This for example is > implemented > > as > > > an option already in AsyncWaitOperator. > > > > > > I could do a similar dive into option 2) if that would also be helpful, > > > though maybe this is a good starting point for conversation. > > > > > > Hope that addressed your questions, > > > Alan > > > > > > On Mon, Sep 18, 2023 at 6:51 PM liu ron <ron9....@gmail.com> wrote: > > > > > > > Hi, Alan > > > > > > > > Thanks for driving this proposal. It sounds interesting. > > > > Regarding implementing the Remote Function, can you go into more > detail > > > > about your idea, how we should support it, and how users should use > it, > > > > from API design to semantic explanation?and how does the remote > > function > > > > help to solve your problem? > > > > > > > > I understand that your core pain point is that there are performance > > > issues > > > > with too many RPC calls. For the three solutions you have explored. > > > > Regarding the Lookup Join Cons, > > > > > > > > >> *Lookup Joins:* > > > > Pros: > > > > - Part of the Flink codebase > > > > - High throughput > > > > Cons: > > > > - Unintuitive syntax > > > > - Harder to do multiple remote calls per input row > > > > > > > > I think one solution is to support Mini-Batch Lookup Join by the > > > framework > > > > layer, do a RPC call by a batch input row, which can improve > > throughput. > > > > > > > > Best, > > > > Ron > > > > > > > > Alan Sheinberg <asheinb...@confluent.io.invalid> 于2023年9月19日周二 > > 07:34写道: > > > > > > > > > Hello all, > > > > > > > > > > We want to implement a custom function that sends HTTP requests to > a > > > > remote > > > > > endpoint using Flink SQL. Even though the function will behave > like a > > > > > normal UDF, the runtime would issue calls asynchronously to achieve > > > high > > > > > throughput for these remote (potentially high latency) calls. What > is > > > the > > > > > community's take on implementing greater support for such > functions? > > > Any > > > > > feedback is appreciated. > > > > > > > > > > What we have explored so far: > > > > > > > > > > 1. Using a lookup join > > > > > < > > > > > > > > > > > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join > > > > > >. > > > > > For example: > > > > > create TEMPORARY TABLE RemoteTable(table_lookup_key string, resp > > > string, > > > > > PRIMARY KEY (table_lookup_key) NOT ENFORCED) with ('connector' = > > > > > 'remote_call'); > > > > > SELECT i.table_lookup_key, resp FROM Inputs as i JOIN RemoteTable r > > FOR > > > > > SYSTEM_TIME AS OF i.proc_time as a ON i.table_lookup_key = r. > > > > > table_lookup_key; > > > > > > > > > > 2. Using a polymorphic table function. Partially supported already > > for > > > > > window > > > > > functions > > > > > < > > > > > > > > > > > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-tvf/ > > > > > >. > > > > > For example: > > > > > SELECT * FROM TABLE (REMOTE_CALL (Input => Table(TableToLookup) as > d, > > > Col > > > > > => DESCRIPTOR("table_lookup_key"))); > > > > > > > > > > 3. Using an AsyncScalarFunction. Scalar functions are usually used > > as > > > > > below (thus support for an async version of ScalarFunction > required): > > > > > SELECT REMOTE_CALL(t.table_lookup_key) FROM TableToLookup t; > > > > > > > > > > Some pros and cons for each approach: > > > > > > > > > > *Lookup Joins:* > > > > > Pros: > > > > > - Part of the Flink codebase > > > > > - High throughput > > > > > Cons: > > > > > - Unintuitive syntax > > > > > - Harder to do multiple remote calls per input row > > > > > > > > > > *PTFs:* > > > > > Pros: > > > > > - More intuitive syntax > > > > > Cons: > > > > > - Need to add more support in Flink. It may exist for specialized > > > > built-in > > > > > functions, but not for user defined ones > > > > > > > > > > *AsyncScalarFunction:* > > > > > Pros: > > > > > - Most intuitive syntax > > > > > - Easy to do as many calls per row input as desired > > > > > Cons: > > > > > - Need to add support in Flink, including a new interface with an > > async > > > > > eval method > > > > > - Out of order results could pose issues with SQL semantics. If we > > > output > > > > > in order, the throughput performance may suffer > > > > > > > > > > Thanks, > > > > > Alan > > > > > > > > > > > > > > >