yuchengxin opened a new pull request, #28310:
URL: https://github.com/apache/flink/pull/28310
---
What is the purpose of the change
This pull request introduces a user-defined timeout handler for
AsyncTableFunction. When an async invocation exceeds the configured timeout,
the framework now dispatches to a matching
timeout method on the function so users can return a fallback row or
surface a domain-specific exception, instead of unconditionally failing the
record (or the job) with the default
TimeoutException. The mechanism is wired through both call sites that
consume AsyncTableFunction today — async lookup join and async correlate
(table-function lateral join).
Brief change log
- AsyncTableFunction: documented the timeout convention (signature parity
with eval, synchronous completion on the mailbox thread, exception
transparency, overload resolution,
empty-collection fallback semantics).
- UserDefinedFunctionHelper: added validation so that, when a timeout
method is present, its visibility, modifiers and parameter list (first param is
a CompletableFuture with the same
generic type as eval, remaining params are lookup keys with the same types
and order) are checked at registration time.
- Codegen path: extended BridgingFunctionGenUtil,
FunctionCallCodeGenerator, FunctionCodeGenerator, AsyncCorrelateCodeGenerator,
AsyncCodeGenerator and LookupJoinCodeGenerator to:
- resolve the matching timeout overload against the current call site's
lookup-key types,
- emit a timeout(...) dispatch on the generated async function,
- fail fast during planning with a ValidationException (including FQN,
expected signature and actual candidates) when a timeout method exists with a
non-assignable signature.
- Runtime:
- DelegatingAsyncTableResultFuture invokes the user-supplied timeout
handler on the mailbox thread, enforces synchronous completion (future.isDone()
check after the call; otherwise
short-circuits with IllegalStateException), and propagates synchronous
exceptions to the downstream ResultFuture.
- AsyncLookupJoinRunner / AsyncCorrelateRunner route timeout events
through the new handler and preserve INNER/LEFT OUTER semantics for
empty-collection completions.
- Default behavior (no timeout method) is unchanged: the framework keeps
emitting the original TimeoutException.
Verifying this change
This change added tests and can be verified as follows:
- Unit tests
- UserDefinedFunctionHelperTest: validates positive cases plus rejection
of bad timeout signatures (wrong arity, mismatched key types, missing
CompletableFuture, non-public, static,
etc.).
- DelegatingAsyncTableResultFutureTest: covers synchronous-complete,
synchronous-throw, and the "did not complete the future before returning" guard.
- AsyncCorrelateRunnerTest / AsyncLookupJoinRunnerTimeoutTest: cover
dispatch into the user timeout handler, fallback-row emission, exception
propagation, and overload resolution against
multiple eval/timeout pairs.
- E2E SQL tests
- AsyncTableFunctionTimeoutE2ETest, AsyncCorrelateTimeoutE2ETest,
AsyncLookupJoinTimeoutE2ETest, backed by a new TimeoutAsyncLookupTableFactory,
exercise the full plan→codegen→runtime
path for both lookup join (INNER and LEFT OUTER) and async correlate,
asserting that fallback rows, NULL-padded rows and user-thrown exceptions
surface as documented.
Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with
@Public(Evolving): yes (AsyncTableFunction — additive: a new optional timeout
convention is recognized; existing subclasses
without a timeout method are unaffected)
- The serializers: no
- The runtime per-record code paths (performance sensitive): yes (only
when a timeout method is declared; the dispatch sits on the existing
async-timeout path and runs at most once per
timed-out record)
- Anything that affects deployment or recovery (JobManager, Checkpointing,
K8s/Yarn, ZooKeeper): no
- The S3 file system connector: no
Documentation
- Does this pull request introduce a new feature? yes
- If yes, how is the feature documented? JavaDocs (the timeout convention,
constraints and dispatch/error behavior are documented on AsyncTableFunction,
with a worked example). User-facing
docs for AsyncTableFunction lookup-join timeouts can be added in a
follow-up doc-only commit if the reviewers prefer.
Was generative AI tooling used to co-author this PR?
- Yes — Claude Code (claude-opus-4-7)
Generated-by: Claude Code (claude-opus-4-7)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]