davidradl commented on code in PR #26611: URL: https://github.com/apache/flink/pull/26611#discussion_r2113916485
########## docs/content/docs/dev/table/functions/udfs.md: ########## @@ -1172,6 +1173,153 @@ If you intend to implement or call functions in Python, please refer to the [Pyt {{< top >}} +Asynchronous Table Functions +---------------- + +Similar to `AsyncScalarFunction`, there also exists a `AsyncTableFunction` for returning multiple row results rather than a single scalar value. Similarly, this is most useful when interacting with external systems (for example when enriching stream events with data stored in a database). + +Asynchronous interaction with an external system means that a single function instance can handle many requests concurrently and receive the responses concurrently. That way, the waiting time can be overlaid with sending other requests and receiving responses. At the very least, the waiting time is amortized over multiple requests. This leads in most cased to much higher streaming throughput. + +#### Defining an AsyncTableFunction + +A user-defined asynchronous table function maps zero, one, or multiple scalar values to zero, one, or multiple Rows, but does it asynchronously. Any data type listed in the [data types section]({{< ref "docs/dev/table/types" >}}) can be used as a parameter or return type of an evaluation method. + +In order to define an asynchronous table function, one has to extend the base class `AsyncTableFunction` in `org.apache.flink.table.functions` and implement one or more evaluation methods named `eval(...)`. The first argument must be a `CompletableFuture<...>` which is used to return the result, with subsequent arguments being the parameters passed to the function. + +The number of outstanding calls to `eval` may be configured by `table.exec.async-scalar.buffer-capacity`. + +#### Asynchronous Semantics +While calls to an `AsyncTableFunction` may be completed out of the original input order, to maintain correct semantics, the outputs of the function are guaranteed to maintain that input order to downstream components of the query. The data itself could reveal completion order (e.g. by containing fetch timestamps), so the user should consider whether this is acceptable for the use-case. + +#### Error Handling +The primary way for a user to indicate an error is to call `completableFuture.completeExceptionally(throwable)`. Similarly, if an exception is encountered by the system when invoking `eval`, that will also result in an error. When an error occurs, the system will consider the retry strategy, configured by `table.exec.async-table.retry-strategy`. If this is `NO_RETRY`, it will fail the job immediately. If it is set to `FIXED_DELAY`, a period of `table.exec.async-table.retry-delay` will be waited, and the function call will be retried and given another attempt to succeed. If the number of retries exceeds `table.exec.async-table.max-attempts` or if the timeout `table.exec.async-table.timeout` expires (including all retry attempts), the job will fail. Review Comment: nit: it will fail the job immediately. -> the job is failed. The it is not very clear for me -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org