twalthr commented on code in PR #26610: URL: https://github.com/apache/flink/pull/26610#discussion_r2231092048
########## docs/content/docs/dev/table/functions/udfs.md: ########## @@ -1017,6 +1018,88 @@ If you intend to implement or call functions in Python, please refer to the [Pyt {{< top >}} +Asynchronous Scalar Functions +---------------- + +When interacting with external systems (for example when enriching stream events with data stored in a database), one needs to take care that network or other latency does not dominate the streaming application’s running time. + +Naively accessing data in the external database, for example using a `ScalarFunction`, typically means **synchronous** interaction: A request is sent to the database and the `ScalarFunction` waits until the response has been received. In many cases, this waiting makes up the vast majority of the function’s time. + +To address this inefficiency, there is an `AsyncScalarFunction`. Asynchronous interaction with the database means that a single function instance can handle many requests concurrently and receive the responses concurrently. That way, the waiting time can be overlaid with sending other requests and receiving responses. At the very least, the waiting time is amortized over multiple requests. This leads in most cases to much higher streaming throughput. + +{{< img src="/fig/async_io.svg" width="50%" >}} + +#### Defining an AsyncScalarFunction + +A user-defined asynchronous scalar function maps zero, one, or multiple scalar values to a new scalar value. Any data type listed in the [data types section]({{< ref "docs/dev/table/types" >}}) can be used as a parameter or return type of an evaluation method. + +In order to define an asynchronous scalar function, extend the base class `AsyncScalarFunction` in `org.apache.flink.table.functions` and implement one or more evaluation methods named `eval(...)`. The first argument must be a `CompletableFuture<...>` which is used to return the result, with subsequent arguments being the parameters passed to the function. + +The number of outstanding calls to `eval` may be configured by [`table.exec.async-scalar.max-concurrent-operations`]({{< ref "docs/dev/table/config#table-exec-async-scalar-max-concurrent-operations" >}}). + +#### Asynchronous Semantics +While calls to an `AsyncScalarFunction` may be completed out of the original input order, to maintain correct semantics, the outputs of the function are guaranteed to maintain that input order to downstream components of the query. The data itself could reveal completion order (e.g. by containing fetch timestamps), so the user should consider whether this is acceptable for their use-case. + +#### Error Handling +The primary way for a user to indicate an error is to call `completableFuture.completeExceptionally(throwable)`. Similarly, if an exception is encountered by the system when invoking `eval`, that will also result in an error. When an error occurs, the system will consider the retry strategy, configured by [`table.exec.async-scalar.retry-strategy`]({{< ref "docs/dev/table/config#table-exec-async-scalar-retry-strategy" >}}). If this is `NO_RETRY`, the job is failed. If it is set to `FIXED_DELAY`, a period of [`table.exec.async-scalar.retry-delay`]({{< ref "docs/dev/table/config#table-exec-async-scalar-retry-delay" >}}) will be waited, and the function call will be retried. If there have been [`table.exec.async-scalar.max-attempts`]({{< ref "docs/dev/table/config#table-exec-async-scalar-max-attempts" >}}) failed attempts or if the timeout [`table.exec.async-scalar.timeout`]({{< ref "docs/dev/table/config#table-exec-async-scalar-timeout" >}}) expires (including all retry attempts), the job will fail. + +#### Is AsyncScalarFunction Required? Review Comment: ```suggestion #### AsyncScalarFunction vs. ScalarFunction ``` ########## docs/content/docs/dev/table/functions/udfs.md: ########## @@ -1017,6 +1018,88 @@ If you intend to implement or call functions in Python, please refer to the [Pyt {{< top >}} +Asynchronous Scalar Functions +---------------- + +When interacting with external systems (for example when enriching stream events with data stored in a database), one needs to take care that network or other latency does not dominate the streaming application’s running time. + +Naively accessing data in the external database, for example using a `ScalarFunction`, typically means **synchronous** interaction: A request is sent to the database and the `ScalarFunction` waits until the response has been received. In many cases, this waiting makes up the vast majority of the function’s time. + +To address this inefficiency, there is an `AsyncScalarFunction`. Asynchronous interaction with the database means that a single function instance can handle many requests concurrently and receive the responses concurrently. That way, the waiting time can be overlaid with sending other requests and receiving responses. At the very least, the waiting time is amortized over multiple requests. This leads in most cases to much higher streaming throughput. + +{{< img src="/fig/async_io.svg" width="50%" >}} + +#### Defining an AsyncScalarFunction + +A user-defined asynchronous scalar function maps zero, one, or multiple scalar values to a new scalar value. Any data type listed in the [data types section]({{< ref "docs/dev/table/types" >}}) can be used as a parameter or return type of an evaluation method. + +In order to define an asynchronous scalar function, extend the base class `AsyncScalarFunction` in `org.apache.flink.table.functions` and implement one or more evaluation methods named `eval(...)`. The first argument must be a `CompletableFuture<...>` which is used to return the result, with subsequent arguments being the parameters passed to the function. + +The number of outstanding calls to `eval` may be configured by [`table.exec.async-scalar.max-concurrent-operations`]({{< ref "docs/dev/table/config#table-exec-async-scalar-max-concurrent-operations" >}}). + +#### Asynchronous Semantics +While calls to an `AsyncScalarFunction` may be completed out of the original input order, to maintain correct semantics, the outputs of the function are guaranteed to maintain that input order to downstream components of the query. The data itself could reveal completion order (e.g. by containing fetch timestamps), so the user should consider whether this is acceptable for their use-case. + +#### Error Handling +The primary way for a user to indicate an error is to call `completableFuture.completeExceptionally(throwable)`. Similarly, if an exception is encountered by the system when invoking `eval`, that will also result in an error. When an error occurs, the system will consider the retry strategy, configured by [`table.exec.async-scalar.retry-strategy`]({{< ref "docs/dev/table/config#table-exec-async-scalar-retry-strategy" >}}). If this is `NO_RETRY`, the job is failed. If it is set to `FIXED_DELAY`, a period of [`table.exec.async-scalar.retry-delay`]({{< ref "docs/dev/table/config#table-exec-async-scalar-retry-delay" >}}) will be waited, and the function call will be retried. If there have been [`table.exec.async-scalar.max-attempts`]({{< ref "docs/dev/table/config#table-exec-async-scalar-max-attempts" >}}) failed attempts or if the timeout [`table.exec.async-scalar.timeout`]({{< ref "docs/dev/table/config#table-exec-async-scalar-timeout" >}}) expires (including all retry attempts), the job will fail. + +#### Is AsyncScalarFunction Required? +One thing to consider is if the UDF contains CPU intensive logic with no blocking calls. If so, it likely doesn't require asynchronous functionality and could use a `ScalarFunction`. If the logic involves waiting for things like network or background operations (e.g. database lookups, RPCs, or REST calls), this may be a useful way to speed things up. + + +The following example shows how to do work on a thread pool in the background, though any libraries exposing an async interface may be directly used to complete the `CompletableFuture` from a callback. See the [Implementation Guide](#implementation-guide) for more details. + +```java +import org.apache.flink.table.api.*; +import org.apache.flink.table.functions.AsyncScalarFunction; + +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; + +import static org.apache.flink.table.api.Expressions.*; + +public static class BackgroundFunction extends AsyncScalarFunction { + private Executor executor; + + @Override + public void open(FunctionContext context) { + executor = Executors.newFixedThreadPool(10); + } + + // take any data type and return INT Review Comment: ```suggestion ``` ########## docs/content/docs/dev/table/functions/udfs.md: ########## @@ -1017,6 +1018,88 @@ If you intend to implement or call functions in Python, please refer to the [Pyt {{< top >}} +Asynchronous Scalar Functions +---------------- + +When interacting with external systems (for example when enriching stream events with data stored in a database), one needs to take care that network or other latency does not dominate the streaming application’s running time. + +Naively accessing data in the external database, for example using a `ScalarFunction`, typically means **synchronous** interaction: A request is sent to the database and the `ScalarFunction` waits until the response has been received. In many cases, this waiting makes up the vast majority of the function’s time. + +To address this inefficiency, there is an `AsyncScalarFunction`. Asynchronous interaction with the database means that a single function instance can handle many requests concurrently and receive the responses concurrently. That way, the waiting time can be overlaid with sending other requests and receiving responses. At the very least, the waiting time is amortized over multiple requests. This leads in most cases to much higher streaming throughput. + +{{< img src="/fig/async_io.svg" width="50%" >}} + +#### Defining an AsyncScalarFunction + +A user-defined asynchronous scalar function maps zero, one, or multiple scalar values to a new scalar value. Any data type listed in the [data types section]({{< ref "docs/dev/table/types" >}}) can be used as a parameter or return type of an evaluation method. + +In order to define an asynchronous scalar function, extend the base class `AsyncScalarFunction` in `org.apache.flink.table.functions` and implement one or more evaluation methods named `eval(...)`. The first argument must be a `CompletableFuture<...>` which is used to return the result, with subsequent arguments being the parameters passed to the function. + +The number of outstanding calls to `eval` may be configured by [`table.exec.async-scalar.max-concurrent-operations`]({{< ref "docs/dev/table/config#table-exec-async-scalar-max-concurrent-operations" >}}). + +#### Asynchronous Semantics +While calls to an `AsyncScalarFunction` may be completed out of the original input order, to maintain correct semantics, the outputs of the function are guaranteed to maintain that input order to downstream components of the query. The data itself could reveal completion order (e.g. by containing fetch timestamps), so the user should consider whether this is acceptable for their use-case. + +#### Error Handling +The primary way for a user to indicate an error is to call `completableFuture.completeExceptionally(throwable)`. Similarly, if an exception is encountered by the system when invoking `eval`, that will also result in an error. When an error occurs, the system will consider the retry strategy, configured by [`table.exec.async-scalar.retry-strategy`]({{< ref "docs/dev/table/config#table-exec-async-scalar-retry-strategy" >}}). If this is `NO_RETRY`, the job is failed. If it is set to `FIXED_DELAY`, a period of [`table.exec.async-scalar.retry-delay`]({{< ref "docs/dev/table/config#table-exec-async-scalar-retry-delay" >}}) will be waited, and the function call will be retried. If there have been [`table.exec.async-scalar.max-attempts`]({{< ref "docs/dev/table/config#table-exec-async-scalar-max-attempts" >}}) failed attempts or if the timeout [`table.exec.async-scalar.timeout`]({{< ref "docs/dev/table/config#table-exec-async-scalar-timeout" >}}) expires (including all retry attempts), the job will fail. + +#### Is AsyncScalarFunction Required? +One thing to consider is if the UDF contains CPU intensive logic with no blocking calls. If so, it likely doesn't require asynchronous functionality and could use a `ScalarFunction`. If the logic involves waiting for things like network or background operations (e.g. database lookups, RPCs, or REST calls), this may be a useful way to speed things up. + + +The following example shows how to do work on a thread pool in the background, though any libraries exposing an async interface may be directly used to complete the `CompletableFuture` from a callback. See the [Implementation Guide](#implementation-guide) for more details. + +```java +import org.apache.flink.table.api.*; +import org.apache.flink.table.functions.AsyncScalarFunction; + +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; + +import static org.apache.flink.table.api.Expressions.*; + +public static class BackgroundFunction extends AsyncScalarFunction { Review Comment: Add JavaDoc ########## docs/content/docs/dev/table/functions/udfs.md: ########## @@ -1017,6 +1018,88 @@ If you intend to implement or call functions in Python, please refer to the [Pyt {{< top >}} +Asynchronous Scalar Functions +---------------- + +When interacting with external systems (for example when enriching stream events with data stored in a database), one needs to take care that network or other latency does not dominate the streaming application’s running time. + +Naively accessing data in the external database, for example using a `ScalarFunction`, typically means **synchronous** interaction: A request is sent to the database and the `ScalarFunction` waits until the response has been received. In many cases, this waiting makes up the vast majority of the function’s time. + +To address this inefficiency, there is an `AsyncScalarFunction`. Asynchronous interaction with the database means that a single function instance can handle many requests concurrently and receive the responses concurrently. That way, the waiting time can be overlaid with sending other requests and receiving responses. At the very least, the waiting time is amortized over multiple requests. This leads in most cases to much higher streaming throughput. + +{{< img src="/fig/async_io.svg" width="50%" >}} + +#### Defining an AsyncScalarFunction + +A user-defined asynchronous scalar function maps zero, one, or multiple scalar values to a new scalar value. Any data type listed in the [data types section]({{< ref "docs/dev/table/types" >}}) can be used as a parameter or return type of an evaluation method. + +In order to define an asynchronous scalar function, extend the base class `AsyncScalarFunction` in `org.apache.flink.table.functions` and implement one or more evaluation methods named `eval(...)`. The first argument must be a `CompletableFuture<...>` which is used to return the result, with subsequent arguments being the parameters passed to the function. + +The number of outstanding calls to `eval` may be configured by [`table.exec.async-scalar.max-concurrent-operations`]({{< ref "docs/dev/table/config#table-exec-async-scalar-max-concurrent-operations" >}}). + +#### Asynchronous Semantics +While calls to an `AsyncScalarFunction` may be completed out of the original input order, to maintain correct semantics, the outputs of the function are guaranteed to maintain that input order to downstream components of the query. The data itself could reveal completion order (e.g. by containing fetch timestamps), so the user should consider whether this is acceptable for their use-case. + +#### Error Handling +The primary way for a user to indicate an error is to call `completableFuture.completeExceptionally(throwable)`. Similarly, if an exception is encountered by the system when invoking `eval`, that will also result in an error. When an error occurs, the system will consider the retry strategy, configured by [`table.exec.async-scalar.retry-strategy`]({{< ref "docs/dev/table/config#table-exec-async-scalar-retry-strategy" >}}). If this is `NO_RETRY`, the job is failed. If it is set to `FIXED_DELAY`, a period of [`table.exec.async-scalar.retry-delay`]({{< ref "docs/dev/table/config#table-exec-async-scalar-retry-delay" >}}) will be waited, and the function call will be retried. If there have been [`table.exec.async-scalar.max-attempts`]({{< ref "docs/dev/table/config#table-exec-async-scalar-max-attempts" >}}) failed attempts or if the timeout [`table.exec.async-scalar.timeout`]({{< ref "docs/dev/table/config#table-exec-async-scalar-timeout" >}}) expires (including all retry attempts), the job will fail. + +#### Is AsyncScalarFunction Required? +One thing to consider is if the UDF contains CPU intensive logic with no blocking calls. If so, it likely doesn't require asynchronous functionality and could use a `ScalarFunction`. If the logic involves waiting for things like network or background operations (e.g. database lookups, RPCs, or REST calls), this may be a useful way to speed things up. Review Comment: ```suggestion One thing to consider is if the UDF contains CPU intensive logic with no blocking calls. If so, it likely doesn't require asynchronous functionality and could use a `ScalarFunction`. If the logic involves waiting for things like network or background operations (e.g. database lookups, RPCs, or REST calls), this may be a useful way to speed things up. ``` ########## docs/content/docs/dev/table/functions/udfs.md: ########## @@ -1017,6 +1018,88 @@ If you intend to implement or call functions in Python, please refer to the [Pyt {{< top >}} +Asynchronous Scalar Functions +---------------- + +When interacting with external systems (for example when enriching stream events with data stored in a database), one needs to take care that network or other latency does not dominate the streaming application’s running time. + +Naively accessing data in the external database, for example using a `ScalarFunction`, typically means **synchronous** interaction: A request is sent to the database and the `ScalarFunction` waits until the response has been received. In many cases, this waiting makes up the vast majority of the function’s time. + +To address this inefficiency, there is an `AsyncScalarFunction`. Asynchronous interaction with the database means that a single function instance can handle many requests concurrently and receive the responses concurrently. That way, the waiting time can be overlaid with sending other requests and receiving responses. At the very least, the waiting time is amortized over multiple requests. This leads in most cases to much higher streaming throughput. + +{{< img src="/fig/async_io.svg" width="50%" >}} + +#### Defining an AsyncScalarFunction + +A user-defined asynchronous scalar function maps zero, one, or multiple scalar values to a new scalar value. Any data type listed in the [data types section]({{< ref "docs/dev/table/types" >}}) can be used as a parameter or return type of an evaluation method. + +In order to define an asynchronous scalar function, extend the base class `AsyncScalarFunction` in `org.apache.flink.table.functions` and implement one or more evaluation methods named `eval(...)`. The first argument must be a `CompletableFuture<...>` which is used to return the result, with subsequent arguments being the parameters passed to the function. + +The number of outstanding calls to `eval` may be configured by [`table.exec.async-scalar.max-concurrent-operations`]({{< ref "docs/dev/table/config#table-exec-async-scalar-max-concurrent-operations" >}}). + +#### Asynchronous Semantics +While calls to an `AsyncScalarFunction` may be completed out of the original input order, to maintain correct semantics, the outputs of the function are guaranteed to maintain that input order to downstream components of the query. The data itself could reveal completion order (e.g. by containing fetch timestamps), so the user should consider whether this is acceptable for their use-case. + +#### Error Handling +The primary way for a user to indicate an error is to call `completableFuture.completeExceptionally(throwable)`. Similarly, if an exception is encountered by the system when invoking `eval`, that will also result in an error. When an error occurs, the system will consider the retry strategy, configured by [`table.exec.async-scalar.retry-strategy`]({{< ref "docs/dev/table/config#table-exec-async-scalar-retry-strategy" >}}). If this is `NO_RETRY`, the job is failed. If it is set to `FIXED_DELAY`, a period of [`table.exec.async-scalar.retry-delay`]({{< ref "docs/dev/table/config#table-exec-async-scalar-retry-delay" >}}) will be waited, and the function call will be retried. If there have been [`table.exec.async-scalar.max-attempts`]({{< ref "docs/dev/table/config#table-exec-async-scalar-max-attempts" >}}) failed attempts or if the timeout [`table.exec.async-scalar.timeout`]({{< ref "docs/dev/table/config#table-exec-async-scalar-timeout" >}}) expires (including all retry attempts), the job will fail. Review Comment: ```suggestion The primary way for a user to indicate an error is to call `CompletableFuture.completeExceptionally(Throwable)`. Similarly, if an exception is encountered by the system when invoking `eval`, that will also result in an error. When an error occurs, the system will consider the retry strategy, configured by [`table.exec.async-scalar.retry-strategy`]({{< ref "docs/dev/table/config#table-exec-async-scalar-retry-strategy" >}}). If this is `NO_RETRY`, the job is failed. If it is set to `FIXED_DELAY`, a period of [`table.exec.async-scalar.retry-delay`]({{< ref "docs/dev/table/config#table-exec-async-scalar-retry-delay" >}}) will be waited, and the function call will be retried. If there have been [`table.exec.async-scalar.max-attempts`]({{< ref "docs/dev/table/config#table-exec-async-scalar-max-attempts" >}}) failed attempts or if the timeout [`table.exec.async-scalar.timeout`]({{< ref "docs/dev/table/config#table-exec-async-scalar-timeout" >}}) expires (including all retry attempts), th e job will fail. ``` ########## docs/content/docs/dev/table/functions/udfs.md: ########## @@ -1017,6 +1018,88 @@ If you intend to implement or call functions in Python, please refer to the [Pyt {{< top >}} +Asynchronous Scalar Functions +---------------- + +When interacting with external systems (for example when enriching stream events with data stored in a database), one needs to take care that network or other latency does not dominate the streaming application’s running time. + +Naively accessing data in the external database, for example using a `ScalarFunction`, typically means **synchronous** interaction: A request is sent to the database and the `ScalarFunction` waits until the response has been received. In many cases, this waiting makes up the vast majority of the function’s time. + +To address this inefficiency, there is an `AsyncScalarFunction`. Asynchronous interaction with the database means that a single function instance can handle many requests concurrently and receive the responses concurrently. That way, the waiting time can be overlaid with sending other requests and receiving responses. At the very least, the waiting time is amortized over multiple requests. This leads in most cases to much higher streaming throughput. + +{{< img src="/fig/async_io.svg" width="50%" >}} + +#### Defining an AsyncScalarFunction + +A user-defined asynchronous scalar function maps zero, one, or multiple scalar values to a new scalar value. Any data type listed in the [data types section]({{< ref "docs/dev/table/types" >}}) can be used as a parameter or return type of an evaluation method. + +In order to define an asynchronous scalar function, extend the base class `AsyncScalarFunction` in `org.apache.flink.table.functions` and implement one or more evaluation methods named `eval(...)`. The first argument must be a `CompletableFuture<...>` which is used to return the result, with subsequent arguments being the parameters passed to the function. + +The number of outstanding calls to `eval` may be configured by [`table.exec.async-scalar.max-concurrent-operations`]({{< ref "docs/dev/table/config#table-exec-async-scalar-max-concurrent-operations" >}}). + +#### Asynchronous Semantics +While calls to an `AsyncScalarFunction` may be completed out of the original input order, to maintain correct semantics, the outputs of the function are guaranteed to maintain that input order to downstream components of the query. The data itself could reveal completion order (e.g. by containing fetch timestamps), so the user should consider whether this is acceptable for their use-case. + +#### Error Handling +The primary way for a user to indicate an error is to call `completableFuture.completeExceptionally(throwable)`. Similarly, if an exception is encountered by the system when invoking `eval`, that will also result in an error. When an error occurs, the system will consider the retry strategy, configured by [`table.exec.async-scalar.retry-strategy`]({{< ref "docs/dev/table/config#table-exec-async-scalar-retry-strategy" >}}). If this is `NO_RETRY`, the job is failed. If it is set to `FIXED_DELAY`, a period of [`table.exec.async-scalar.retry-delay`]({{< ref "docs/dev/table/config#table-exec-async-scalar-retry-delay" >}}) will be waited, and the function call will be retried. If there have been [`table.exec.async-scalar.max-attempts`]({{< ref "docs/dev/table/config#table-exec-async-scalar-max-attempts" >}}) failed attempts or if the timeout [`table.exec.async-scalar.timeout`]({{< ref "docs/dev/table/config#table-exec-async-scalar-timeout" >}}) expires (including all retry attempts), the job will fail. + +#### Is AsyncScalarFunction Required? +One thing to consider is if the UDF contains CPU intensive logic with no blocking calls. If so, it likely doesn't require asynchronous functionality and could use a `ScalarFunction`. If the logic involves waiting for things like network or background operations (e.g. database lookups, RPCs, or REST calls), this may be a useful way to speed things up. Review Comment: Also mentions that AsyncScalarFunction are not supported in parts of a SQL query. When in doubt use ScalarFunction instead. ########## docs/content/docs/dev/table/functions/udfs.md: ########## @@ -1017,6 +1018,88 @@ If you intend to implement or call functions in Python, please refer to the [Pyt {{< top >}} +Asynchronous Scalar Functions +---------------- + +When interacting with external systems (for example when enriching stream events with data stored in a database), one needs to take care that network or other latency does not dominate the streaming application’s running time. + +Naively accessing data in the external database, for example using a `ScalarFunction`, typically means **synchronous** interaction: A request is sent to the database and the `ScalarFunction` waits until the response has been received. In many cases, this waiting makes up the vast majority of the function’s time. + +To address this inefficiency, there is an `AsyncScalarFunction`. Asynchronous interaction with the database means that a single function instance can handle many requests concurrently and receive the responses concurrently. That way, the waiting time can be overlaid with sending other requests and receiving responses. At the very least, the waiting time is amortized over multiple requests. This leads in most cases to much higher streaming throughput. + +{{< img src="/fig/async_io.svg" width="50%" >}} + +#### Defining an AsyncScalarFunction + +A user-defined asynchronous scalar function maps zero, one, or multiple scalar values to a new scalar value. Any data type listed in the [data types section]({{< ref "docs/dev/table/types" >}}) can be used as a parameter or return type of an evaluation method. + +In order to define an asynchronous scalar function, extend the base class `AsyncScalarFunction` in `org.apache.flink.table.functions` and implement one or more evaluation methods named `eval(...)`. The first argument must be a `CompletableFuture<...>` which is used to return the result, with subsequent arguments being the parameters passed to the function. + +The number of outstanding calls to `eval` may be configured by [`table.exec.async-scalar.max-concurrent-operations`]({{< ref "docs/dev/table/config#table-exec-async-scalar-max-concurrent-operations" >}}). + +#### Asynchronous Semantics +While calls to an `AsyncScalarFunction` may be completed out of the original input order, to maintain correct semantics, the outputs of the function are guaranteed to maintain that input order to downstream components of the query. The data itself could reveal completion order (e.g. by containing fetch timestamps), so the user should consider whether this is acceptable for their use-case. + +#### Error Handling +The primary way for a user to indicate an error is to call `completableFuture.completeExceptionally(throwable)`. Similarly, if an exception is encountered by the system when invoking `eval`, that will also result in an error. When an error occurs, the system will consider the retry strategy, configured by [`table.exec.async-scalar.retry-strategy`]({{< ref "docs/dev/table/config#table-exec-async-scalar-retry-strategy" >}}). If this is `NO_RETRY`, the job is failed. If it is set to `FIXED_DELAY`, a period of [`table.exec.async-scalar.retry-delay`]({{< ref "docs/dev/table/config#table-exec-async-scalar-retry-delay" >}}) will be waited, and the function call will be retried. If there have been [`table.exec.async-scalar.max-attempts`]({{< ref "docs/dev/table/config#table-exec-async-scalar-max-attempts" >}}) failed attempts or if the timeout [`table.exec.async-scalar.timeout`]({{< ref "docs/dev/table/config#table-exec-async-scalar-timeout" >}}) expires (including all retry attempts), the job will fail. + +#### Is AsyncScalarFunction Required? +One thing to consider is if the UDF contains CPU intensive logic with no blocking calls. If so, it likely doesn't require asynchronous functionality and could use a `ScalarFunction`. If the logic involves waiting for things like network or background operations (e.g. database lookups, RPCs, or REST calls), this may be a useful way to speed things up. + + +The following example shows how to do work on a thread pool in the background, though any libraries exposing an async interface may be directly used to complete the `CompletableFuture` from a callback. See the [Implementation Guide](#implementation-guide) for more details. + +```java +import org.apache.flink.table.api.*; +import org.apache.flink.table.functions.AsyncScalarFunction; + +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; + +import static org.apache.flink.table.api.Expressions.*; + +public static class BackgroundFunction extends AsyncScalarFunction { + private Executor executor; + + @Override + public void open(FunctionContext context) { + executor = Executors.newFixedThreadPool(10); Review Comment: A SQL persona might have never worked with a `CompletableFuture` or `Executor`. Maybe add a bit more inline comments what each step does. ########## docs/content/docs/dev/table/functions/udfs.md: ########## @@ -1017,6 +1018,88 @@ If you intend to implement or call functions in Python, please refer to the [Pyt {{< top >}} +Asynchronous Scalar Functions +---------------- + +When interacting with external systems (for example when enriching stream events with data stored in a database), one needs to take care that network or other latency does not dominate the streaming application’s running time. + +Naively accessing data in the external database, for example using a `ScalarFunction`, typically means **synchronous** interaction: A request is sent to the database and the `ScalarFunction` waits until the response has been received. In many cases, this waiting makes up the vast majority of the function’s time. + +To address this inefficiency, there is an `AsyncScalarFunction`. Asynchronous interaction with the database means that a single function instance can handle many requests concurrently and receive the responses concurrently. That way, the waiting time can be overlaid with sending other requests and receiving responses. At the very least, the waiting time is amortized over multiple requests. This leads in most cases to much higher streaming throughput. + +{{< img src="/fig/async_io.svg" width="50%" >}} + +#### Defining an AsyncScalarFunction + +A user-defined asynchronous scalar function maps zero, one, or multiple scalar values to a new scalar value. Any data type listed in the [data types section]({{< ref "docs/dev/table/types" >}}) can be used as a parameter or return type of an evaluation method. + +In order to define an asynchronous scalar function, extend the base class `AsyncScalarFunction` in `org.apache.flink.table.functions` and implement one or more evaluation methods named `eval(...)`. The first argument must be a `CompletableFuture<...>` which is used to return the result, with subsequent arguments being the parameters passed to the function. + +The number of outstanding calls to `eval` may be configured by [`table.exec.async-scalar.max-concurrent-operations`]({{< ref "docs/dev/table/config#table-exec-async-scalar-max-concurrent-operations" >}}). + +#### Asynchronous Semantics +While calls to an `AsyncScalarFunction` may be completed out of the original input order, to maintain correct semantics, the outputs of the function are guaranteed to maintain that input order to downstream components of the query. The data itself could reveal completion order (e.g. by containing fetch timestamps), so the user should consider whether this is acceptable for their use-case. + +#### Error Handling +The primary way for a user to indicate an error is to call `completableFuture.completeExceptionally(throwable)`. Similarly, if an exception is encountered by the system when invoking `eval`, that will also result in an error. When an error occurs, the system will consider the retry strategy, configured by [`table.exec.async-scalar.retry-strategy`]({{< ref "docs/dev/table/config#table-exec-async-scalar-retry-strategy" >}}). If this is `NO_RETRY`, the job is failed. If it is set to `FIXED_DELAY`, a period of [`table.exec.async-scalar.retry-delay`]({{< ref "docs/dev/table/config#table-exec-async-scalar-retry-delay" >}}) will be waited, and the function call will be retried. If there have been [`table.exec.async-scalar.max-attempts`]({{< ref "docs/dev/table/config#table-exec-async-scalar-max-attempts" >}}) failed attempts or if the timeout [`table.exec.async-scalar.timeout`]({{< ref "docs/dev/table/config#table-exec-async-scalar-timeout" >}}) expires (including all retry attempts), the job will fail. + +#### Is AsyncScalarFunction Required? +One thing to consider is if the UDF contains CPU intensive logic with no blocking calls. If so, it likely doesn't require asynchronous functionality and could use a `ScalarFunction`. If the logic involves waiting for things like network or background operations (e.g. database lookups, RPCs, or REST calls), this may be a useful way to speed things up. + + +The following example shows how to do work on a thread pool in the background, though any libraries exposing an async interface may be directly used to complete the `CompletableFuture` from a callback. See the [Implementation Guide](#implementation-guide) for more details. Review Comment: Move the example up into `Defining an AsyncScalarFunction`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org