Hello Alan, At my company we implemented an open source Flink HTTP connector, that you might find interesting. It can be represented as a source table as well and be used in lookups. Here is the link https://github.com/getindata/flink-http-connector
Best Krzysztof On Thu, Sep 21, 2023 at 7:34 AM Feng Jin <jinfeng1...@gmail.com> wrote: > Hi Alan > > I believe that supporting asynchronous UDF is a valuable > feature. Currently, there is a similar FLIP[1] available: > Can this meet your needs? > > [1]. > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction > > > Best, > Feng > > On Thu, Sep 21, 2023 at 1:12 PM Alan Sheinberg > <asheinb...@confluent.io.invalid> wrote: > > > Hi Ron, > > > > Thanks for your response. I've answered some of your questions below. > > > > I think one solution is to support Mini-Batch Lookup Join by the > framework > > > layer, do a RPC call by a batch input row, which can improve > throughput. > > > > > > Would the idea be to collect a batch and then do a single RPC (or at > least > > handle a number of rpcs in a single AsyncLookupFunction call)? That is > an > > interesting idea and could simplify things. For our use cases, > > technically, I can write a AsyncLookupFunction and utilize > > AsyncWaitOperator using unbatched RPCs and do a Lookup Join without any > > issue. My hesitation is that I'm afraid that callers will find it > > unintuitive to join with a table where the underlying RPC is not being > > modeled in that manner. For example, it could be a text classifier > > IS_POSITIVE_SENTIMENT(...) where there's no backing table, just > > computation. > > > > how does the remote function help to solve your problem? > > > > > > The problem is pretty open-ended. There are jobs where you want to join > > data with some external data source and inject it into your pipeline, but > > others might also be offloading some computation to an external system. > > The external system might be owned by a different party, have different > > permissions, have different hardware to do a computation (e.g. train a > > model), or just block for a while. The most intuitive invocation for > this > > is just a scalar function in SQL. You just want it to be able to run at > a > > high throughput. > > > > Regarding implementing the Remote Function, can you go into more detail > > > about your idea, how we should support it, and how users should use it, > > > from API design to semantic explanation?and > > > > > > The unimplemented option I gave the most thought to is 3). You can > imagine > > an AsyncScalarFunction definition and example class like: > > > > public class AsyncScalarFunction<T> extends UserDefinedFunction { > > @Override public final FunctionKind getKind() { > > return FunctionKind.ASYNC_SCALAR; > > } > > @Override public TypeInference getTypeInference(DataTypeFactory > > typeFactory) { > > return TypeInferenceExtractor.forAsyncScalarFunction(typeFactory, > > getClass()); > > } > > } > > > > class MyScalarFunction extends AsyncScalarFunction<String> { > > // Eval method with a future to use as a callback, with arbitrary > > additional arguments > > public void eval(CompletableFuture<String> result, String input) { > > // Example which uses an async http client > > AsyncHttpClient httpClient = new AsyncHttpClient(); > > // Do the request and then invoke the callback depending on the > > outcome. > > Future<HttpResponse> responseFuture = > httpClient.doPOST(getRequestBody( > > input)); > > responseFuture.handle((response, throwable) -> { > > if (throwable != null) { > > result.completeExceptionally(throwable); > > } else { > > result.complete(response.getBody()); > > } > > }); > > } > > ... > > } > > > > Then you can register it in your Flink program as with other UDFs and > call > > it: > > tEnv.createTemporarySystemFunction("MY_FUNCTION", > MyScalarFunction.class); > > TableResult result = tEnv.executeSql("SELECT MY_FUNCTION(input) FROM > > (SELECT i.input from Inputs i ORDER BY i.timestamp)"); > > > > I know there are questions about SQL semantics to consider. For example, > > does invocation of MY_FUNCTION preserve the order of the subquery above. > > To be SQL compliant, I believe it must, so any async request we send out > > must be output in order, regardless of when they complete. There are > > probably other considerations as well. This for example is implemented > as > > an option already in AsyncWaitOperator. > > > > I could do a similar dive into option 2) if that would also be helpful, > > though maybe this is a good starting point for conversation. > > > > Hope that addressed your questions, > > Alan > > > > On Mon, Sep 18, 2023 at 6:51 PM liu ron <ron9....@gmail.com> wrote: > > > > > Hi, Alan > > > > > > Thanks for driving this proposal. It sounds interesting. > > > Regarding implementing the Remote Function, can you go into more detail > > > about your idea, how we should support it, and how users should use it, > > > from API design to semantic explanation?and how does the remote > function > > > help to solve your problem? > > > > > > I understand that your core pain point is that there are performance > > issues > > > with too many RPC calls. For the three solutions you have explored. > > > Regarding the Lookup Join Cons, > > > > > > >> *Lookup Joins:* > > > Pros: > > > - Part of the Flink codebase > > > - High throughput > > > Cons: > > > - Unintuitive syntax > > > - Harder to do multiple remote calls per input row > > > > > > I think one solution is to support Mini-Batch Lookup Join by the > > framework > > > layer, do a RPC call by a batch input row, which can improve > throughput. > > > > > > Best, > > > Ron > > > > > > Alan Sheinberg <asheinb...@confluent.io.invalid> 于2023年9月19日周二 > 07:34写道: > > > > > > > Hello all, > > > > > > > > We want to implement a custom function that sends HTTP requests to a > > > remote > > > > endpoint using Flink SQL. Even though the function will behave like a > > > > normal UDF, the runtime would issue calls asynchronously to achieve > > high > > > > throughput for these remote (potentially high latency) calls. What is > > the > > > > community's take on implementing greater support for such functions? > > Any > > > > feedback is appreciated. > > > > > > > > What we have explored so far: > > > > > > > > 1. Using a lookup join > > > > < > > > > > > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join > > > > >. > > > > For example: > > > > create TEMPORARY TABLE RemoteTable(table_lookup_key string, resp > > string, > > > > PRIMARY KEY (table_lookup_key) NOT ENFORCED) with ('connector' = > > > > 'remote_call'); > > > > SELECT i.table_lookup_key, resp FROM Inputs as i JOIN RemoteTable r > FOR > > > > SYSTEM_TIME AS OF i.proc_time as a ON i.table_lookup_key = r. > > > > table_lookup_key; > > > > > > > > 2. Using a polymorphic table function. Partially supported already > for > > > > window > > > > functions > > > > < > > > > > > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-tvf/ > > > > >. > > > > For example: > > > > SELECT * FROM TABLE (REMOTE_CALL (Input => Table(TableToLookup) as d, > > Col > > > > => DESCRIPTOR("table_lookup_key"))); > > > > > > > > 3. Using an AsyncScalarFunction. Scalar functions are usually used > as > > > > below (thus support for an async version of ScalarFunction required): > > > > SELECT REMOTE_CALL(t.table_lookup_key) FROM TableToLookup t; > > > > > > > > Some pros and cons for each approach: > > > > > > > > *Lookup Joins:* > > > > Pros: > > > > - Part of the Flink codebase > > > > - High throughput > > > > Cons: > > > > - Unintuitive syntax > > > > - Harder to do multiple remote calls per input row > > > > > > > > *PTFs:* > > > > Pros: > > > > - More intuitive syntax > > > > Cons: > > > > - Need to add more support in Flink. It may exist for specialized > > > built-in > > > > functions, but not for user defined ones > > > > > > > > *AsyncScalarFunction:* > > > > Pros: > > > > - Most intuitive syntax > > > > - Easy to do as many calls per row input as desired > > > > Cons: > > > > - Need to add support in Flink, including a new interface with an > async > > > > eval method > > > > - Out of order results could pose issues with SQL semantics. If we > > output > > > > in order, the throughput performance may suffer > > > > > > > > Thanks, > > > > Alan > > > > > > > > > >