Hi Feng, Krzysztof

Sorry for the delayed response.

FLIP-313 seems to be a good solution for the TableFunction use case.  It
would be nice to also have async support for ScalarFunction,
AggregateFunction, and TableAggregateFunction too ideally, though this is a
lot of work.  ScalarFunction is really what I'm most immediately interested
in, since it's such a common and straightforward interface for invoking
user functionality, especially for people who are less experienced in SQL.
If I wanted to implement some of this functionality, should I jump right in
with a FLIP?

https://github.com/getindata/flink-http-connector: I looked through this
code and it looks like a library for utilizing a lookup join which
translates to http requests.  For a lot of my use cases I have been
testing, I have effectively done exactly this sort of thing, though in a
less generic way.  Thanks for pointing this out to me.

-Alan

On Wed, Sep 27, 2023 at 12:18 PM Krzysztof Zarzycki <k.zarzy...@gmail.com>
wrote:

> Hello Alan,
> At my company we implemented an open source Flink HTTP connector, that you
> might find interesting. It can be represented as a source table as well
> and be used in lookups. Here is the link
> https://github.com/getindata/flink-http-connector
>
> Best
> Krzysztof
>
>
> On Thu, Sep 21, 2023 at 7:34 AM Feng Jin <jinfeng1...@gmail.com> wrote:
>
> > Hi Alan
> >
> > I believe that supporting asynchronous UDF is a valuable
> > feature. Currently, there is a similar FLIP[1] available:
> > Can this meet your needs?
> >
> > [1].
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction
> >
> >
> > Best,
> > Feng
> >
> > On Thu, Sep 21, 2023 at 1:12 PM Alan Sheinberg
> > <asheinb...@confluent.io.invalid> wrote:
> >
> > > Hi Ron,
> > >
> > > Thanks for your response.  I've answered some of your questions below.
> > >
> > > I think one solution is to support Mini-Batch Lookup Join by the
> > framework
> > > > layer, do a RPC call by a batch input row, which can improve
> > throughput.
> > >
> > >
> > > Would the idea be to collect a batch and then do a single RPC (or at
> > least
> > > handle a number of rpcs in a single AsyncLookupFunction call)?  That is
> > an
> > > interesting idea and could simplify things.  For our use cases,
> > > technically, I can write a AsyncLookupFunction and utilize
> > > AsyncWaitOperator using unbatched RPCs and do a Lookup Join without any
> > > issue. My hesitation is that I'm afraid that callers will find it
> > > unintuitive to join with a table where the underlying RPC is not being
> > > modeled in that manner.  For example, it could be a text classifier
> > > IS_POSITIVE_SENTIMENT(...) where there's no backing table, just
> > > computation.
> > >
> > > how does the remote function help to solve your problem?
> > >
> > >
> > > The problem is pretty open-ended.  There are jobs where you want to
> join
> > > data with some external data source and inject it into your pipeline,
> but
> > > others might also be offloading some computation to an external system.
> > > The external system might be owned by a different party, have different
> > > permissions, have different hardware to do a computation (e.g. train a
> > > model), or just block for a while.  The most intuitive invocation for
> > this
> > > is just a scalar function in SQL.  You just want it to be able to run
> at
> > a
> > > high throughput.
> > >
> > > Regarding implementing the Remote Function, can you go into more detail
> > > > about your idea, how we should support it, and how users should use
> it,
> > > > from API design to semantic explanation?and
> > >
> > >
> > > The unimplemented option I gave the most thought to is 3).  You can
> > imagine
> > > an AsyncScalarFunction definition and example class like:
> > >
> > > public class AsyncScalarFunction<T> extends UserDefinedFunction {
> > >   @Override public final FunctionKind getKind() {
> > >     return FunctionKind.ASYNC_SCALAR;
> > >   }
> > >   @Override public TypeInference getTypeInference(DataTypeFactory
> > > typeFactory) {
> > >    return TypeInferenceExtractor.forAsyncScalarFunction(typeFactory,
> > > getClass());
> > >   }
> > > }
> > >
> > > class MyScalarFunction extends AsyncScalarFunction<String> {
> > >   // Eval method with a future to use as a callback, with arbitrary
> > > additional arguments
> > >   public void eval(CompletableFuture<String> result, String input) {
> > >     // Example which uses an async http client
> > >     AsyncHttpClient httpClient = new AsyncHttpClient();
> > >     // Do the request and then invoke the callback depending on the
> > > outcome.
> > >     Future<HttpResponse> responseFuture =
> > httpClient.doPOST(getRequestBody(
> > > input));
> > >     responseFuture.handle((response, throwable) -> {
> > >     if (throwable != null) {
> > >       result.completeExceptionally(throwable);
> > >     } else {
> > >       result.complete(response.getBody());
> > >     }
> > >    });
> > >  }
> > >  ...
> > > }
> > >
> > > Then you can register it in your Flink program as with other UDFs and
> > call
> > > it:
> > > tEnv.createTemporarySystemFunction("MY_FUNCTION",
> > MyScalarFunction.class);
> > > TableResult result = tEnv.executeSql("SELECT MY_FUNCTION(input) FROM
> > > (SELECT i.input from Inputs i ORDER BY i.timestamp)");
> > >
> > > I know there are questions about SQL semantics to consider.  For
> example,
> > > does invocation of MY_FUNCTION preserve the order of the subquery
> above.
> > > To be SQL compliant, I believe it must, so any async request we send
> out
> > > must be output in order, regardless of when they complete.  There are
> > > probably other considerations as well.   This for example is
> implemented
> > as
> > > an option already in AsyncWaitOperator.
> > >
> > > I could do a similar dive into option 2) if that would also be helpful,
> > > though maybe this is a good starting point for conversation.
> > >
> > > Hope that addressed your questions,
> > > Alan
> > >
> > > On Mon, Sep 18, 2023 at 6:51 PM liu ron <ron9....@gmail.com> wrote:
> > >
> > > > Hi, Alan
> > > >
> > > > Thanks for driving this proposal. It sounds interesting.
> > > > Regarding implementing the Remote Function, can you go into more
> detail
> > > > about your idea, how we should support it, and how users should use
> it,
> > > > from API design to semantic explanation?and how does the remote
> > function
> > > > help to solve your problem?
> > > >
> > > > I understand that your core pain point is that there are performance
> > > issues
> > > > with too many RPC calls. For the three solutions you have explored.
> > > > Regarding the Lookup Join Cons,
> > > >
> > > > >> *Lookup Joins:*
> > > > Pros:
> > > > - Part of the Flink codebase
> > > > - High throughput
> > > > Cons:
> > > > - Unintuitive syntax
> > > > - Harder to do multiple remote calls per input row
> > > >
> > > > I think one solution is to support Mini-Batch Lookup Join by the
> > > framework
> > > > layer, do a RPC call by a batch input row, which can improve
> > throughput.
> > > >
> > > > Best,
> > > > Ron
> > > >
> > > > Alan Sheinberg <asheinb...@confluent.io.invalid> 于2023年9月19日周二
> > 07:34写道:
> > > >
> > > > > Hello all,
> > > > >
> > > > > We want to implement a custom function that sends HTTP requests to
> a
> > > > remote
> > > > > endpoint using Flink SQL. Even though the function will behave
> like a
> > > > > normal UDF, the runtime would issue calls asynchronously to achieve
> > > high
> > > > > throughput for these remote (potentially high latency) calls. What
> is
> > > the
> > > > > community's take on implementing greater support for such
> functions?
> > > Any
> > > > > feedback is appreciated.
> > > > >
> > > > > What we have explored so far:
> > > > >
> > > > > 1.  Using a lookup join
> > > > > <
> > > > >
> > > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join
> > > > > >.
> > > > > For example:
> > > > > create TEMPORARY TABLE RemoteTable(table_lookup_key string, resp
> > > string,
> > > > > PRIMARY KEY (table_lookup_key) NOT ENFORCED) with ('connector' =
> > > > > 'remote_call');
> > > > > SELECT i.table_lookup_key, resp FROM Inputs as i JOIN RemoteTable r
> > FOR
> > > > > SYSTEM_TIME AS OF i.proc_time as a ON i.table_lookup_key = r.
> > > > > table_lookup_key;
> > > > >
> > > > > 2.  Using a polymorphic table function. Partially supported already
> > for
> > > > > window
> > > > > functions
> > > > > <
> > > > >
> > > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-tvf/
> > > > > >.
> > > > > For example:
> > > > > SELECT * FROM TABLE (REMOTE_CALL (Input => Table(TableToLookup) as
> d,
> > > Col
> > > > > => DESCRIPTOR("table_lookup_key")));
> > > > >
> > > > > 3.  Using an AsyncScalarFunction. Scalar functions are usually used
> > as
> > > > > below (thus support for an async version of ScalarFunction
> required):
> > > > > SELECT REMOTE_CALL(t.table_lookup_key) FROM TableToLookup t;
> > > > >
> > > > > Some pros and cons for each approach:
> > > > >
> > > > > *Lookup Joins:*
> > > > > Pros:
> > > > > - Part of the Flink codebase
> > > > > - High throughput
> > > > > Cons:
> > > > > - Unintuitive syntax
> > > > > - Harder to do multiple remote calls per input row
> > > > >
> > > > > *PTFs:*
> > > > > Pros:
> > > > > - More intuitive syntax
> > > > > Cons:
> > > > > - Need to add more support in Flink. It may exist for specialized
> > > > built-in
> > > > > functions, but not for user defined ones
> > > > >
> > > > > *AsyncScalarFunction:*
> > > > > Pros:
> > > > > - Most intuitive syntax
> > > > > - Easy to do as many calls per row input as desired
> > > > > Cons:
> > > > > - Need to add support in Flink, including a new interface with an
> > async
> > > > > eval method
> > > > > - Out of order results could pose issues with SQL semantics. If we
> > > output
> > > > > in order, the throughput performance may suffer
> > > > >
> > > > > Thanks,
> > > > > Alan
> > > > >
> > > >
> > >
> >
>

Reply via email to