yuchengxin opened a new pull request, #28310:
URL: https://github.com/apache/flink/pull/28310

   ---
     What is the purpose of the change
   
     This pull request introduces a user-defined timeout handler for 
AsyncTableFunction. When an async invocation exceeds the configured timeout, 
the framework now dispatches to a matching
     timeout method on the function so users can return a fallback row or 
surface a domain-specific exception, instead of unconditionally failing the 
record (or the job) with the default
     TimeoutException. The mechanism is wired through both call sites that 
consume AsyncTableFunction today — async lookup join and async correlate 
(table-function lateral join).
   
     Brief change log
   
     - AsyncTableFunction: documented the timeout convention (signature parity 
with eval, synchronous completion on the mailbox thread, exception 
transparency, overload resolution,
     empty-collection fallback semantics).
     - UserDefinedFunctionHelper: added validation so that, when a timeout 
method is present, its visibility, modifiers and parameter list (first param is 
a CompletableFuture with the same
     generic type as eval, remaining params are lookup keys with the same types 
and order) are checked at registration time.
     - Codegen path: extended BridgingFunctionGenUtil, 
FunctionCallCodeGenerator, FunctionCodeGenerator, AsyncCorrelateCodeGenerator, 
AsyncCodeGenerator and LookupJoinCodeGenerator to:
       - resolve the matching timeout overload against the current call site's 
lookup-key types,
       - emit a timeout(...) dispatch on the generated async function,
       - fail fast during planning with a ValidationException (including FQN, 
expected signature and actual candidates) when a timeout method exists with a 
non-assignable signature.
     - Runtime:
       - DelegatingAsyncTableResultFuture invokes the user-supplied timeout 
handler on the mailbox thread, enforces synchronous completion (future.isDone() 
check after the call; otherwise
     short-circuits with IllegalStateException), and propagates synchronous 
exceptions to the downstream ResultFuture.
       - AsyncLookupJoinRunner / AsyncCorrelateRunner route timeout events 
through the new handler and preserve INNER/LEFT OUTER semantics for 
empty-collection completions.
     - Default behavior (no timeout method) is unchanged: the framework keeps 
emitting the original TimeoutException.
   
     Verifying this change
   
     This change added tests and can be verified as follows:
   
     - Unit tests
       - UserDefinedFunctionHelperTest: validates positive cases plus rejection 
of bad timeout signatures (wrong arity, mismatched key types, missing 
CompletableFuture, non-public, static,
     etc.).
       - DelegatingAsyncTableResultFutureTest: covers synchronous-complete, 
synchronous-throw, and the "did not complete the future before returning" guard.
       - AsyncCorrelateRunnerTest / AsyncLookupJoinRunnerTimeoutTest: cover 
dispatch into the user timeout handler, fallback-row emission, exception 
propagation, and overload resolution against
      multiple eval/timeout pairs.
     - E2E SQL tests
       - AsyncTableFunctionTimeoutE2ETest, AsyncCorrelateTimeoutE2ETest, 
AsyncLookupJoinTimeoutE2ETest, backed by a new TimeoutAsyncLookupTableFactory, 
exercise the full plan→codegen→runtime
     path for both lookup join (INNER and LEFT OUTER) and async correlate, 
asserting that fallback rows, NULL-padded rows and user-thrown exceptions 
surface as documented.
   
     Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with 
@Public(Evolving): yes (AsyncTableFunction — additive: a new optional timeout 
convention is recognized; existing subclasses
     without a timeout method are unaffected)
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): yes (only 
when a timeout method is declared; the dispatch sits on the existing 
async-timeout path and runs at most once per
     timed-out record)
     - Anything that affects deployment or recovery (JobManager, Checkpointing, 
K8s/Yarn, ZooKeeper): no
     - The S3 file system connector: no
   
     Documentation
   
     - Does this pull request introduce a new feature? yes
     - If yes, how is the feature documented? JavaDocs (the timeout convention, 
constraints and dispatch/error behavior are documented on AsyncTableFunction, 
with a worked example). User-facing
      docs for AsyncTableFunction lookup-join timeouts can be added in a 
follow-up doc-only commit if the reviewers prefer.
   
     Was generative AI tooling used to co-author this PR?
   
     - Yes — Claude Code (claude-opus-4-7)
   
     Generated-by: Claude Code (claude-opus-4-7)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to