Hello Alan,
At my company we implemented an open source Flink HTTP connector, that you
might find interesting. It can be represented as a source table as well
and be used in lookups. Here is the link
https://github.com/getindata/flink-http-connector

Best
Krzysztof


On Thu, Sep 21, 2023 at 7:34 AM Feng Jin <jinfeng1...@gmail.com> wrote:

> Hi Alan
>
> I believe that supporting asynchronous UDF is a valuable
> feature. Currently, there is a similar FLIP[1] available:
> Can this meet your needs?
>
> [1].
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction
>
>
> Best,
> Feng
>
> On Thu, Sep 21, 2023 at 1:12 PM Alan Sheinberg
> <asheinb...@confluent.io.invalid> wrote:
>
> > Hi Ron,
> >
> > Thanks for your response.  I've answered some of your questions below.
> >
> > I think one solution is to support Mini-Batch Lookup Join by the
> framework
> > > layer, do a RPC call by a batch input row, which can improve
> throughput.
> >
> >
> > Would the idea be to collect a batch and then do a single RPC (or at
> least
> > handle a number of rpcs in a single AsyncLookupFunction call)?  That is
> an
> > interesting idea and could simplify things.  For our use cases,
> > technically, I can write a AsyncLookupFunction and utilize
> > AsyncWaitOperator using unbatched RPCs and do a Lookup Join without any
> > issue. My hesitation is that I'm afraid that callers will find it
> > unintuitive to join with a table where the underlying RPC is not being
> > modeled in that manner.  For example, it could be a text classifier
> > IS_POSITIVE_SENTIMENT(...) where there's no backing table, just
> > computation.
> >
> > how does the remote function help to solve your problem?
> >
> >
> > The problem is pretty open-ended.  There are jobs where you want to join
> > data with some external data source and inject it into your pipeline, but
> > others might also be offloading some computation to an external system.
> > The external system might be owned by a different party, have different
> > permissions, have different hardware to do a computation (e.g. train a
> > model), or just block for a while.  The most intuitive invocation for
> this
> > is just a scalar function in SQL.  You just want it to be able to run at
> a
> > high throughput.
> >
> > Regarding implementing the Remote Function, can you go into more detail
> > > about your idea, how we should support it, and how users should use it,
> > > from API design to semantic explanation?and
> >
> >
> > The unimplemented option I gave the most thought to is 3).  You can
> imagine
> > an AsyncScalarFunction definition and example class like:
> >
> > public class AsyncScalarFunction<T> extends UserDefinedFunction {
> >   @Override public final FunctionKind getKind() {
> >     return FunctionKind.ASYNC_SCALAR;
> >   }
> >   @Override public TypeInference getTypeInference(DataTypeFactory
> > typeFactory) {
> >    return TypeInferenceExtractor.forAsyncScalarFunction(typeFactory,
> > getClass());
> >   }
> > }
> >
> > class MyScalarFunction extends AsyncScalarFunction<String> {
> >   // Eval method with a future to use as a callback, with arbitrary
> > additional arguments
> >   public void eval(CompletableFuture<String> result, String input) {
> >     // Example which uses an async http client
> >     AsyncHttpClient httpClient = new AsyncHttpClient();
> >     // Do the request and then invoke the callback depending on the
> > outcome.
> >     Future<HttpResponse> responseFuture =
> httpClient.doPOST(getRequestBody(
> > input));
> >     responseFuture.handle((response, throwable) -> {
> >     if (throwable != null) {
> >       result.completeExceptionally(throwable);
> >     } else {
> >       result.complete(response.getBody());
> >     }
> >    });
> >  }
> >  ...
> > }
> >
> > Then you can register it in your Flink program as with other UDFs and
> call
> > it:
> > tEnv.createTemporarySystemFunction("MY_FUNCTION",
> MyScalarFunction.class);
> > TableResult result = tEnv.executeSql("SELECT MY_FUNCTION(input) FROM
> > (SELECT i.input from Inputs i ORDER BY i.timestamp)");
> >
> > I know there are questions about SQL semantics to consider.  For example,
> > does invocation of MY_FUNCTION preserve the order of the subquery above.
> > To be SQL compliant, I believe it must, so any async request we send out
> > must be output in order, regardless of when they complete.  There are
> > probably other considerations as well.   This for example is implemented
> as
> > an option already in AsyncWaitOperator.
> >
> > I could do a similar dive into option 2) if that would also be helpful,
> > though maybe this is a good starting point for conversation.
> >
> > Hope that addressed your questions,
> > Alan
> >
> > On Mon, Sep 18, 2023 at 6:51 PM liu ron <ron9....@gmail.com> wrote:
> >
> > > Hi, Alan
> > >
> > > Thanks for driving this proposal. It sounds interesting.
> > > Regarding implementing the Remote Function, can you go into more detail
> > > about your idea, how we should support it, and how users should use it,
> > > from API design to semantic explanation?and how does the remote
> function
> > > help to solve your problem?
> > >
> > > I understand that your core pain point is that there are performance
> > issues
> > > with too many RPC calls. For the three solutions you have explored.
> > > Regarding the Lookup Join Cons,
> > >
> > > >> *Lookup Joins:*
> > > Pros:
> > > - Part of the Flink codebase
> > > - High throughput
> > > Cons:
> > > - Unintuitive syntax
> > > - Harder to do multiple remote calls per input row
> > >
> > > I think one solution is to support Mini-Batch Lookup Join by the
> > framework
> > > layer, do a RPC call by a batch input row, which can improve
> throughput.
> > >
> > > Best,
> > > Ron
> > >
> > > Alan Sheinberg <asheinb...@confluent.io.invalid> 于2023年9月19日周二
> 07:34写道:
> > >
> > > > Hello all,
> > > >
> > > > We want to implement a custom function that sends HTTP requests to a
> > > remote
> > > > endpoint using Flink SQL. Even though the function will behave like a
> > > > normal UDF, the runtime would issue calls asynchronously to achieve
> > high
> > > > throughput for these remote (potentially high latency) calls. What is
> > the
> > > > community's take on implementing greater support for such functions?
> > Any
> > > > feedback is appreciated.
> > > >
> > > > What we have explored so far:
> > > >
> > > > 1.  Using a lookup join
> > > > <
> > > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join
> > > > >.
> > > > For example:
> > > > create TEMPORARY TABLE RemoteTable(table_lookup_key string, resp
> > string,
> > > > PRIMARY KEY (table_lookup_key) NOT ENFORCED) with ('connector' =
> > > > 'remote_call');
> > > > SELECT i.table_lookup_key, resp FROM Inputs as i JOIN RemoteTable r
> FOR
> > > > SYSTEM_TIME AS OF i.proc_time as a ON i.table_lookup_key = r.
> > > > table_lookup_key;
> > > >
> > > > 2.  Using a polymorphic table function. Partially supported already
> for
> > > > window
> > > > functions
> > > > <
> > > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-tvf/
> > > > >.
> > > > For example:
> > > > SELECT * FROM TABLE (REMOTE_CALL (Input => Table(TableToLookup) as d,
> > Col
> > > > => DESCRIPTOR("table_lookup_key")));
> > > >
> > > > 3.  Using an AsyncScalarFunction. Scalar functions are usually used
> as
> > > > below (thus support for an async version of ScalarFunction required):
> > > > SELECT REMOTE_CALL(t.table_lookup_key) FROM TableToLookup t;
> > > >
> > > > Some pros and cons for each approach:
> > > >
> > > > *Lookup Joins:*
> > > > Pros:
> > > > - Part of the Flink codebase
> > > > - High throughput
> > > > Cons:
> > > > - Unintuitive syntax
> > > > - Harder to do multiple remote calls per input row
> > > >
> > > > *PTFs:*
> > > > Pros:
> > > > - More intuitive syntax
> > > > Cons:
> > > > - Need to add more support in Flink. It may exist for specialized
> > > built-in
> > > > functions, but not for user defined ones
> > > >
> > > > *AsyncScalarFunction:*
> > > > Pros:
> > > > - Most intuitive syntax
> > > > - Easy to do as many calls per row input as desired
> > > > Cons:
> > > > - Need to add support in Flink, including a new interface with an
> async
> > > > eval method
> > > > - Out of order results could pose issues with SQL semantics. If we
> > output
> > > > in order, the throughput performance may suffer
> > > >
> > > > Thanks,
> > > > Alan
> > > >
> > >
> >
>

Reply via email to