AlanConfluent commented on code in PR #26610:
URL: https://github.com/apache/flink/pull/26610#discussion_r2231452078
##########
docs/content/docs/dev/table/functions/udfs.md:
##########
@@ -1017,6 +1018,88 @@ If you intend to implement or call functions in Python,
please refer to the [Pyt
{{< top >}}
+Asynchronous Scalar Functions
+----------------
+
+When interacting with external systems (for example when enriching stream
events with data stored in a database), one needs to take care that network or
other latency does not dominate the streaming application’s running time.
+
+Naively accessing data in the external database, for example using a
`ScalarFunction`, typically means **synchronous** interaction: A request is
sent to the database and the `ScalarFunction` waits until the response has been
received. In many cases, this waiting makes up the vast majority of the
function’s time.
+
+To address this inefficiency, there is an `AsyncScalarFunction`. Asynchronous
interaction with the database means that a single function instance can handle
many requests concurrently and receive the responses concurrently. That way,
the waiting time can be overlaid with sending other requests and receiving
responses. At the very least, the waiting time is amortized over multiple
requests. This leads in most cases to much higher streaming throughput.
+
+{{< img src="/fig/async_io.svg" width="50%" >}}
+
+#### Defining an AsyncScalarFunction
+
+A user-defined asynchronous scalar function maps zero, one, or multiple scalar
values to a new scalar value. Any data type listed in the [data types
section]({{< ref "docs/dev/table/types" >}}) can be used as a parameter or
return type of an evaluation method.
+
+In order to define an asynchronous scalar function, extend the base class
`AsyncScalarFunction` in `org.apache.flink.table.functions` and implement one
or more evaluation methods named `eval(...)`. The first argument must be a
`CompletableFuture<...>` which is used to return the result, with subsequent
arguments being the parameters passed to the function.
+
+The number of outstanding calls to `eval` may be configured by
[`table.exec.async-scalar.max-concurrent-operations`]({{< ref
"docs/dev/table/config#table-exec-async-scalar-max-concurrent-operations" >}}).
+
+#### Asynchronous Semantics
+While calls to an `AsyncScalarFunction` may be completed out of the original
input order, to maintain correct semantics, the outputs of the function are
guaranteed to maintain that input order to downstream components of the query.
The data itself could reveal completion order (e.g. by containing fetch
timestamps), so the user should consider whether this is acceptable for their
use-case.
+
+#### Error Handling
+The primary way for a user to indicate an error is to call
`completableFuture.completeExceptionally(throwable)`. Similarly, if an
exception is encountered by the system when invoking `eval`, that will also
result in an error. When an error occurs, the system will consider the retry
strategy, configured by [`table.exec.async-scalar.retry-strategy`]({{< ref
"docs/dev/table/config#table-exec-async-scalar-retry-strategy" >}}). If this is
`NO_RETRY`, the job is failed. If it is set to `FIXED_DELAY`, a period of
[`table.exec.async-scalar.retry-delay`]({{< ref
"docs/dev/table/config#table-exec-async-scalar-retry-delay" >}}) will be
waited, and the function call will be retried. If there have been
[`table.exec.async-scalar.max-attempts`]({{< ref
"docs/dev/table/config#table-exec-async-scalar-max-attempts" >}}) failed
attempts or if the timeout [`table.exec.async-scalar.timeout`]({{< ref
"docs/dev/table/config#table-exec-async-scalar-timeout" >}}) expires (including
all retry attempts), the
job will fail.
+
+#### Is AsyncScalarFunction Required?
+One thing to consider is if the UDF contains CPU intensive logic with no
blocking calls. If so, it likely doesn't require asynchronous functionality
and could use a `ScalarFunction`. If the logic involves waiting for things like
network or background operations (e.g. database lookups, RPCs, or REST calls),
this may be a useful way to speed things up.
Review Comment:
Makes sense. I didn't enumerate all of the differences, since this is
likely a moving target, but said that when in doubt ScalarFunction should be
used.
##########
docs/content/docs/dev/table/functions/udfs.md:
##########
@@ -1017,6 +1018,88 @@ If you intend to implement or call functions in Python,
please refer to the [Pyt
{{< top >}}
+Asynchronous Scalar Functions
+----------------
+
+When interacting with external systems (for example when enriching stream
events with data stored in a database), one needs to take care that network or
other latency does not dominate the streaming application’s running time.
+
+Naively accessing data in the external database, for example using a
`ScalarFunction`, typically means **synchronous** interaction: A request is
sent to the database and the `ScalarFunction` waits until the response has been
received. In many cases, this waiting makes up the vast majority of the
function’s time.
+
+To address this inefficiency, there is an `AsyncScalarFunction`. Asynchronous
interaction with the database means that a single function instance can handle
many requests concurrently and receive the responses concurrently. That way,
the waiting time can be overlaid with sending other requests and receiving
responses. At the very least, the waiting time is amortized over multiple
requests. This leads in most cases to much higher streaming throughput.
+
+{{< img src="/fig/async_io.svg" width="50%" >}}
+
+#### Defining an AsyncScalarFunction
+
+A user-defined asynchronous scalar function maps zero, one, or multiple scalar
values to a new scalar value. Any data type listed in the [data types
section]({{< ref "docs/dev/table/types" >}}) can be used as a parameter or
return type of an evaluation method.
+
+In order to define an asynchronous scalar function, extend the base class
`AsyncScalarFunction` in `org.apache.flink.table.functions` and implement one
or more evaluation methods named `eval(...)`. The first argument must be a
`CompletableFuture<...>` which is used to return the result, with subsequent
arguments being the parameters passed to the function.
+
+The number of outstanding calls to `eval` may be configured by
[`table.exec.async-scalar.max-concurrent-operations`]({{< ref
"docs/dev/table/config#table-exec-async-scalar-max-concurrent-operations" >}}).
+
+#### Asynchronous Semantics
+While calls to an `AsyncScalarFunction` may be completed out of the original
input order, to maintain correct semantics, the outputs of the function are
guaranteed to maintain that input order to downstream components of the query.
The data itself could reveal completion order (e.g. by containing fetch
timestamps), so the user should consider whether this is acceptable for their
use-case.
+
+#### Error Handling
+The primary way for a user to indicate an error is to call
`completableFuture.completeExceptionally(throwable)`. Similarly, if an
exception is encountered by the system when invoking `eval`, that will also
result in an error. When an error occurs, the system will consider the retry
strategy, configured by [`table.exec.async-scalar.retry-strategy`]({{< ref
"docs/dev/table/config#table-exec-async-scalar-retry-strategy" >}}). If this is
`NO_RETRY`, the job is failed. If it is set to `FIXED_DELAY`, a period of
[`table.exec.async-scalar.retry-delay`]({{< ref
"docs/dev/table/config#table-exec-async-scalar-retry-delay" >}}) will be
waited, and the function call will be retried. If there have been
[`table.exec.async-scalar.max-attempts`]({{< ref
"docs/dev/table/config#table-exec-async-scalar-max-attempts" >}}) failed
attempts or if the timeout [`table.exec.async-scalar.timeout`]({{< ref
"docs/dev/table/config#table-exec-async-scalar-timeout" >}}) expires (including
all retry attempts), the
job will fail.
+
+#### Is AsyncScalarFunction Required?
+One thing to consider is if the UDF contains CPU intensive logic with no
blocking calls. If so, it likely doesn't require asynchronous functionality
and could use a `ScalarFunction`. If the logic involves waiting for things like
network or background operations (e.g. database lookups, RPCs, or REST calls),
this may be a useful way to speed things up.
+
+
+The following example shows how to do work on a thread pool in the background,
though any libraries exposing an async interface may be directly used to
complete the `CompletableFuture` from a callback. See the [Implementation
Guide](#implementation-guide) for more details.
Review Comment:
Done
##########
docs/content/docs/dev/table/functions/udfs.md:
##########
@@ -1017,6 +1018,88 @@ If you intend to implement or call functions in Python,
please refer to the [Pyt
{{< top >}}
+Asynchronous Scalar Functions
+----------------
+
+When interacting with external systems (for example when enriching stream
events with data stored in a database), one needs to take care that network or
other latency does not dominate the streaming application’s running time.
+
+Naively accessing data in the external database, for example using a
`ScalarFunction`, typically means **synchronous** interaction: A request is
sent to the database and the `ScalarFunction` waits until the response has been
received. In many cases, this waiting makes up the vast majority of the
function’s time.
+
+To address this inefficiency, there is an `AsyncScalarFunction`. Asynchronous
interaction with the database means that a single function instance can handle
many requests concurrently and receive the responses concurrently. That way,
the waiting time can be overlaid with sending other requests and receiving
responses. At the very least, the waiting time is amortized over multiple
requests. This leads in most cases to much higher streaming throughput.
+
+{{< img src="/fig/async_io.svg" width="50%" >}}
+
+#### Defining an AsyncScalarFunction
+
+A user-defined asynchronous scalar function maps zero, one, or multiple scalar
values to a new scalar value. Any data type listed in the [data types
section]({{< ref "docs/dev/table/types" >}}) can be used as a parameter or
return type of an evaluation method.
+
+In order to define an asynchronous scalar function, extend the base class
`AsyncScalarFunction` in `org.apache.flink.table.functions` and implement one
or more evaluation methods named `eval(...)`. The first argument must be a
`CompletableFuture<...>` which is used to return the result, with subsequent
arguments being the parameters passed to the function.
+
+The number of outstanding calls to `eval` may be configured by
[`table.exec.async-scalar.max-concurrent-operations`]({{< ref
"docs/dev/table/config#table-exec-async-scalar-max-concurrent-operations" >}}).
+
+#### Asynchronous Semantics
+While calls to an `AsyncScalarFunction` may be completed out of the original
input order, to maintain correct semantics, the outputs of the function are
guaranteed to maintain that input order to downstream components of the query.
The data itself could reveal completion order (e.g. by containing fetch
timestamps), so the user should consider whether this is acceptable for their
use-case.
+
+#### Error Handling
+The primary way for a user to indicate an error is to call
`completableFuture.completeExceptionally(throwable)`. Similarly, if an
exception is encountered by the system when invoking `eval`, that will also
result in an error. When an error occurs, the system will consider the retry
strategy, configured by [`table.exec.async-scalar.retry-strategy`]({{< ref
"docs/dev/table/config#table-exec-async-scalar-retry-strategy" >}}). If this is
`NO_RETRY`, the job is failed. If it is set to `FIXED_DELAY`, a period of
[`table.exec.async-scalar.retry-delay`]({{< ref
"docs/dev/table/config#table-exec-async-scalar-retry-delay" >}}) will be
waited, and the function call will be retried. If there have been
[`table.exec.async-scalar.max-attempts`]({{< ref
"docs/dev/table/config#table-exec-async-scalar-max-attempts" >}}) failed
attempts or if the timeout [`table.exec.async-scalar.timeout`]({{< ref
"docs/dev/table/config#table-exec-async-scalar-timeout" >}}) expires (including
all retry attempts), the
job will fail.
+
+#### Is AsyncScalarFunction Required?
+One thing to consider is if the UDF contains CPU intensive logic with no
blocking calls. If so, it likely doesn't require asynchronous functionality
and could use a `ScalarFunction`. If the logic involves waiting for things like
network or background operations (e.g. database lookups, RPCs, or REST calls),
this may be a useful way to speed things up.
+
+
+The following example shows how to do work on a thread pool in the background,
though any libraries exposing an async interface may be directly used to
complete the `CompletableFuture` from a callback. See the [Implementation
Guide](#implementation-guide) for more details.
+
+```java
+import org.apache.flink.table.api.*;
+import org.apache.flink.table.functions.AsyncScalarFunction;
+
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import static org.apache.flink.table.api.Expressions.*;
+
+public static class BackgroundFunction extends AsyncScalarFunction {
+ private Executor executor;
+
+ @Override
+ public void open(FunctionContext context) {
+ executor = Executors.newFixedThreadPool(10);
+ }
+
+ // take any data type and return INT
Review Comment:
Removed
##########
docs/content/docs/dev/table/functions/udfs.md:
##########
@@ -1017,6 +1018,88 @@ If you intend to implement or call functions in Python,
please refer to the [Pyt
{{< top >}}
+Asynchronous Scalar Functions
+----------------
+
+When interacting with external systems (for example when enriching stream
events with data stored in a database), one needs to take care that network or
other latency does not dominate the streaming application’s running time.
+
+Naively accessing data in the external database, for example using a
`ScalarFunction`, typically means **synchronous** interaction: A request is
sent to the database and the `ScalarFunction` waits until the response has been
received. In many cases, this waiting makes up the vast majority of the
function’s time.
+
+To address this inefficiency, there is an `AsyncScalarFunction`. Asynchronous
interaction with the database means that a single function instance can handle
many requests concurrently and receive the responses concurrently. That way,
the waiting time can be overlaid with sending other requests and receiving
responses. At the very least, the waiting time is amortized over multiple
requests. This leads in most cases to much higher streaming throughput.
+
+{{< img src="/fig/async_io.svg" width="50%" >}}
+
+#### Defining an AsyncScalarFunction
+
+A user-defined asynchronous scalar function maps zero, one, or multiple scalar
values to a new scalar value. Any data type listed in the [data types
section]({{< ref "docs/dev/table/types" >}}) can be used as a parameter or
return type of an evaluation method.
+
+In order to define an asynchronous scalar function, extend the base class
`AsyncScalarFunction` in `org.apache.flink.table.functions` and implement one
or more evaluation methods named `eval(...)`. The first argument must be a
`CompletableFuture<...>` which is used to return the result, with subsequent
arguments being the parameters passed to the function.
+
+The number of outstanding calls to `eval` may be configured by
[`table.exec.async-scalar.max-concurrent-operations`]({{< ref
"docs/dev/table/config#table-exec-async-scalar-max-concurrent-operations" >}}).
+
+#### Asynchronous Semantics
+While calls to an `AsyncScalarFunction` may be completed out of the original
input order, to maintain correct semantics, the outputs of the function are
guaranteed to maintain that input order to downstream components of the query.
The data itself could reveal completion order (e.g. by containing fetch
timestamps), so the user should consider whether this is acceptable for their
use-case.
+
+#### Error Handling
+The primary way for a user to indicate an error is to call
`completableFuture.completeExceptionally(throwable)`. Similarly, if an
exception is encountered by the system when invoking `eval`, that will also
result in an error. When an error occurs, the system will consider the retry
strategy, configured by [`table.exec.async-scalar.retry-strategy`]({{< ref
"docs/dev/table/config#table-exec-async-scalar-retry-strategy" >}}). If this is
`NO_RETRY`, the job is failed. If it is set to `FIXED_DELAY`, a period of
[`table.exec.async-scalar.retry-delay`]({{< ref
"docs/dev/table/config#table-exec-async-scalar-retry-delay" >}}) will be
waited, and the function call will be retried. If there have been
[`table.exec.async-scalar.max-attempts`]({{< ref
"docs/dev/table/config#table-exec-async-scalar-max-attempts" >}}) failed
attempts or if the timeout [`table.exec.async-scalar.timeout`]({{< ref
"docs/dev/table/config#table-exec-async-scalar-timeout" >}}) expires (including
all retry attempts), the
job will fail.
+
+#### Is AsyncScalarFunction Required?
+One thing to consider is if the UDF contains CPU intensive logic with no
blocking calls. If so, it likely doesn't require asynchronous functionality
and could use a `ScalarFunction`. If the logic involves waiting for things like
network or background operations (e.g. database lookups, RPCs, or REST calls),
this may be a useful way to speed things up.
+
+
+The following example shows how to do work on a thread pool in the background,
though any libraries exposing an async interface may be directly used to
complete the `CompletableFuture` from a callback. See the [Implementation
Guide](#implementation-guide) for more details.
+
+```java
+import org.apache.flink.table.api.*;
+import org.apache.flink.table.functions.AsyncScalarFunction;
+
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import static org.apache.flink.table.api.Expressions.*;
+
+public static class BackgroundFunction extends AsyncScalarFunction {
Review Comment:
Done
##########
docs/content/docs/dev/table/functions/udfs.md:
##########
@@ -1017,6 +1018,88 @@ If you intend to implement or call functions in Python,
please refer to the [Pyt
{{< top >}}
+Asynchronous Scalar Functions
+----------------
+
+When interacting with external systems (for example when enriching stream
events with data stored in a database), one needs to take care that network or
other latency does not dominate the streaming application’s running time.
+
+Naively accessing data in the external database, for example using a
`ScalarFunction`, typically means **synchronous** interaction: A request is
sent to the database and the `ScalarFunction` waits until the response has been
received. In many cases, this waiting makes up the vast majority of the
function’s time.
+
+To address this inefficiency, there is an `AsyncScalarFunction`. Asynchronous
interaction with the database means that a single function instance can handle
many requests concurrently and receive the responses concurrently. That way,
the waiting time can be overlaid with sending other requests and receiving
responses. At the very least, the waiting time is amortized over multiple
requests. This leads in most cases to much higher streaming throughput.
+
+{{< img src="/fig/async_io.svg" width="50%" >}}
+
+#### Defining an AsyncScalarFunction
+
+A user-defined asynchronous scalar function maps zero, one, or multiple scalar
values to a new scalar value. Any data type listed in the [data types
section]({{< ref "docs/dev/table/types" >}}) can be used as a parameter or
return type of an evaluation method.
+
+In order to define an asynchronous scalar function, extend the base class
`AsyncScalarFunction` in `org.apache.flink.table.functions` and implement one
or more evaluation methods named `eval(...)`. The first argument must be a
`CompletableFuture<...>` which is used to return the result, with subsequent
arguments being the parameters passed to the function.
+
+The number of outstanding calls to `eval` may be configured by
[`table.exec.async-scalar.max-concurrent-operations`]({{< ref
"docs/dev/table/config#table-exec-async-scalar-max-concurrent-operations" >}}).
+
+#### Asynchronous Semantics
+While calls to an `AsyncScalarFunction` may be completed out of the original
input order, to maintain correct semantics, the outputs of the function are
guaranteed to maintain that input order to downstream components of the query.
The data itself could reveal completion order (e.g. by containing fetch
timestamps), so the user should consider whether this is acceptable for their
use-case.
+
+#### Error Handling
+The primary way for a user to indicate an error is to call
`completableFuture.completeExceptionally(throwable)`. Similarly, if an
exception is encountered by the system when invoking `eval`, that will also
result in an error. When an error occurs, the system will consider the retry
strategy, configured by [`table.exec.async-scalar.retry-strategy`]({{< ref
"docs/dev/table/config#table-exec-async-scalar-retry-strategy" >}}). If this is
`NO_RETRY`, the job is failed. If it is set to `FIXED_DELAY`, a period of
[`table.exec.async-scalar.retry-delay`]({{< ref
"docs/dev/table/config#table-exec-async-scalar-retry-delay" >}}) will be
waited, and the function call will be retried. If there have been
[`table.exec.async-scalar.max-attempts`]({{< ref
"docs/dev/table/config#table-exec-async-scalar-max-attempts" >}}) failed
attempts or if the timeout [`table.exec.async-scalar.timeout`]({{< ref
"docs/dev/table/config#table-exec-async-scalar-timeout" >}}) expires (including
all retry attempts), the
job will fail.
+
+#### Is AsyncScalarFunction Required?
+One thing to consider is if the UDF contains CPU intensive logic with no
blocking calls. If so, it likely doesn't require asynchronous functionality
and could use a `ScalarFunction`. If the logic involves waiting for things like
network or background operations (e.g. database lookups, RPCs, or REST calls),
this may be a useful way to speed things up.
+
+
+The following example shows how to do work on a thread pool in the background,
though any libraries exposing an async interface may be directly used to
complete the `CompletableFuture` from a callback. See the [Implementation
Guide](#implementation-guide) for more details.
+
+```java
+import org.apache.flink.table.api.*;
+import org.apache.flink.table.functions.AsyncScalarFunction;
+
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import static org.apache.flink.table.api.Expressions.*;
+
+public static class BackgroundFunction extends AsyncScalarFunction {
+ private Executor executor;
+
+ @Override
+ public void open(FunctionContext context) {
+ executor = Executors.newFixedThreadPool(10);
Review Comment:
I added some more comments, and improved the function, which honestly was a
bit too contrived. Instead, I made a BeverageNameLookupFunction and went
deeper with a database lookup example. Hopefully this is much more concrete.
##########
docs/content/docs/dev/table/functions/udfs.md:
##########
@@ -1017,6 +1018,88 @@ If you intend to implement or call functions in Python,
please refer to the [Pyt
{{< top >}}
+Asynchronous Scalar Functions
+----------------
+
+When interacting with external systems (for example when enriching stream
events with data stored in a database), one needs to take care that network or
other latency does not dominate the streaming application’s running time.
+
+Naively accessing data in the external database, for example using a
`ScalarFunction`, typically means **synchronous** interaction: A request is
sent to the database and the `ScalarFunction` waits until the response has been
received. In many cases, this waiting makes up the vast majority of the
function’s time.
+
+To address this inefficiency, there is an `AsyncScalarFunction`. Asynchronous
interaction with the database means that a single function instance can handle
many requests concurrently and receive the responses concurrently. That way,
the waiting time can be overlaid with sending other requests and receiving
responses. At the very least, the waiting time is amortized over multiple
requests. This leads in most cases to much higher streaming throughput.
+
+{{< img src="/fig/async_io.svg" width="50%" >}}
+
+#### Defining an AsyncScalarFunction
+
+A user-defined asynchronous scalar function maps zero, one, or multiple scalar
values to a new scalar value. Any data type listed in the [data types
section]({{< ref "docs/dev/table/types" >}}) can be used as a parameter or
return type of an evaluation method.
+
+In order to define an asynchronous scalar function, extend the base class
`AsyncScalarFunction` in `org.apache.flink.table.functions` and implement one
or more evaluation methods named `eval(...)`. The first argument must be a
`CompletableFuture<...>` which is used to return the result, with subsequent
arguments being the parameters passed to the function.
+
+The number of outstanding calls to `eval` may be configured by
[`table.exec.async-scalar.max-concurrent-operations`]({{< ref
"docs/dev/table/config#table-exec-async-scalar-max-concurrent-operations" >}}).
+
+#### Asynchronous Semantics
+While calls to an `AsyncScalarFunction` may be completed out of the original
input order, to maintain correct semantics, the outputs of the function are
guaranteed to maintain that input order to downstream components of the query.
The data itself could reveal completion order (e.g. by containing fetch
timestamps), so the user should consider whether this is acceptable for their
use-case.
+
+#### Error Handling
+The primary way for a user to indicate an error is to call
`completableFuture.completeExceptionally(throwable)`. Similarly, if an
exception is encountered by the system when invoking `eval`, that will also
result in an error. When an error occurs, the system will consider the retry
strategy, configured by [`table.exec.async-scalar.retry-strategy`]({{< ref
"docs/dev/table/config#table-exec-async-scalar-retry-strategy" >}}). If this is
`NO_RETRY`, the job is failed. If it is set to `FIXED_DELAY`, a period of
[`table.exec.async-scalar.retry-delay`]({{< ref
"docs/dev/table/config#table-exec-async-scalar-retry-delay" >}}) will be
waited, and the function call will be retried. If there have been
[`table.exec.async-scalar.max-attempts`]({{< ref
"docs/dev/table/config#table-exec-async-scalar-max-attempts" >}}) failed
attempts or if the timeout [`table.exec.async-scalar.timeout`]({{< ref
"docs/dev/table/config#table-exec-async-scalar-timeout" >}}) expires (including
all retry attempts), the
job will fail.
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]