lincoln-lil commented on code in PR #28310:
URL: https://github.com/apache/flink/pull/28310#discussion_r3504586608
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunctionHelper.java:
##########
@@ -351,6 +353,94 @@ public static void validateClassForRuntime(
}
}
+ /**
+ * Validates whether an {@link AsyncTableFunction} subclass declares a
usable {@code timeout}
+ * fallback method that mirrors the {@code eval(CompletableFuture, ...)}
signature.
+ *
+ * <p>The detection uses a three-step contract: collect every method
literally named {@code
+ * timeout} via {@link ExtractionUtils#collectMethods}, filter to those
that are both {@code
+ * public} and non-{@code static}, and finally verify the surviving
candidates against the
+ * expected {@code (CompletableFuture<Collection<T>>, argumentClasses)}
signature.
+ *
+ * <ul>
+ * <li>No applicable candidate (none declared, or all
private/static/mis-spelled) — returns
+ * {@code false}; the framework falls back to {@link
+ * java.util.concurrent.TimeoutException} via the default {@code
AsyncFunction#timeout}.
+ * <li>One or more applicable candidates with a matching signature —
returns {@code true};
+ * codegen will emit a {@code fetcher.timeout(...)} dispatch.
+ * <li>One or more applicable candidates but signature mismatch — throws
{@link
+ * ValidationException} eagerly with the FQN, the expected
signature, and every actual
+ * candidate signature so users can locate the offending method
quickly.
+ * </ul>
+ */
+ public static boolean validateAsyncTableFunctionTimeoutClass(
+ Class<? extends UserDefinedFunction> functionClass,
+ Class<?>[] argumentClasses,
+ String functionName) {
+ final List<Method> candidates =
+ ExtractionUtils.collectMethods(functionClass,
ASYNC_TABLE_TIMEOUT);
+ final List<Method> applicable =
+ candidates.stream()
+ .filter(
+ method ->
+
Modifier.isPublic(method.getModifiers())
+ &&
!Modifier.isStatic(method.getModifiers()))
+ .collect(Collectors.toList());
+ if (applicable.isEmpty()) {
+ return false;
+ }
+ // Mirror the eval convention: prepend the implicit CompletableFuture
parameter so the
+ // full expected signature is `timeout(CompletableFuture,
argumentClasses...)`.
+ final Class<?>[] expectedSignature = new
Class<?>[argumentClasses.length + 1];
+ expectedSignature[0] = CompletableFuture.class;
+ System.arraycopy(argumentClasses, 0, expectedSignature, 1,
argumentClasses.length);
+ try {
+ validateClassForRuntime(
+ functionClass,
+ ASYNC_TABLE_TIMEOUT,
+ expectedSignature,
+ void.class,
+ functionName);
+ } catch (ValidationException originalException) {
+ throw new ValidationException(
+ buildTimeoutSignatureMismatchMessage(
+ functionClass, expectedSignature, applicable),
+ originalException);
+ }
+ return true;
+ }
Review Comment:
This validation does not enforce the documented
`CompletableFuture<Collection<...>>` generic parity contract.
##########
flink-table/flink-table-planner/src/test/resources/log4j2-test.properties:
##########
@@ -32,3 +32,6 @@ appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x -
%m%n
#logger.testlogger.name =org.apache.flink.table.planner.codegen
#logger.testlogger.level = TRACE
#logger.testlogger.appenderRefs = TestLogger
+loggers = compileutils
+logger.compileutils.name =
org.apache.flink.table.runtime.generated.CompileUtils
+logger.compileutils.level = DEBUG
Review Comment:
Planner test logging is widened from this quiet-by-default to INFO + DEBUG
setting, seems unnecessary change.
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/AsyncLookupJoinRunner.java:
##########
@@ -144,6 +149,50 @@ public TableFunctionResultFuture<RowData>
createFetcherResultFuture(Configuratio
return resultFuture;
}
+ @Override
+ public void timeout(RowData input, ResultFuture<RowData> resultFuture)
throws Exception {
+ // Find and discard the in-flight future bound to this input row so
that any late
+ // completion from the underlying fetcher is ignored. The generated
fetcher's own
+ // timeout method (rendered when the user UDF provides one) decides
how to complete
+ // the new outResultFuture below; if no user timeout method is
present, the default
+ // AsyncFunction.timeout raises a TimeoutException as before.
+ //
+ // Reference equality on leftRow is intentional: AsyncWaitOperator
passes the same
+ // RowData instance to both asyncInvoke and timeout for a given record
(the operator
+ // already deep-copies under object reuse).
+ JoinedRowResultFuture currentFuture = null;
+ for (JoinedRowResultFuture f : allResultFutures) {
+ if (f.leftRow == input) {
+ currentFuture = f;
+ break;
+ }
+ }
+ if (currentFuture == null || !currentFuture.inuse.compareAndSet(true,
false)) {
+ // current future is already completed and reused
+ return;
+ }
+ allResultFutures.remove(currentFuture);
+ currentFuture.close();
+
+ // Route through join pipeline via new JoinedRowResultFuture
+ JoinedRowResultFuture outResultFuture =
+ new JoinedRowResultFuture(
+ resultFutureBuffer,
+ createFetcherResultFuture(new Configuration()),
+ fetcherConverter,
+ isLeftOuterJoin,
+ rightRowSerializer.getArity());
+ outResultFuture.inuse.set(true);
+ outResultFuture.reset(input, resultFuture);
+ allResultFutures.add(outResultFuture);
Review Comment:
It makes the `outResultFuture` tracked again in allResultFutures, but not
recycled into `resultFutureBuffer` on exceptional timeout completion which may
drain the pooled `resultFutureBuffer` and deadlock later lookups.
Besides fixing this, proper failure case should be added.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]