yuchengxin commented on code in PR #28310:
URL: https://github.com/apache/flink/pull/28310#discussion_r3506102389


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/AsyncLookupJoinRunner.java:
##########
@@ -144,6 +149,50 @@ public TableFunctionResultFuture<RowData> 
createFetcherResultFuture(Configuratio
         return resultFuture;
     }
 
+    @Override
+    public void timeout(RowData input, ResultFuture<RowData> resultFuture) 
throws Exception {
+        // Find and discard the in-flight future bound to this input row so 
that any late
+        // completion from the underlying fetcher is ignored. The generated 
fetcher's own
+        // timeout method (rendered when the user UDF provides one) decides 
how to complete
+        // the new outResultFuture below; if no user timeout method is 
present, the default
+        // AsyncFunction.timeout raises a TimeoutException as before.
+        //
+        // Reference equality on leftRow is intentional: AsyncWaitOperator 
passes the same
+        // RowData instance to both asyncInvoke and timeout for a given record 
(the operator
+        // already deep-copies under object reuse).
+        JoinedRowResultFuture currentFuture = null;
+        for (JoinedRowResultFuture f : allResultFutures) {
+            if (f.leftRow == input) {
+                currentFuture = f;
+                break;
+            }
+        }
+        if (currentFuture == null || !currentFuture.inuse.compareAndSet(true, 
false)) {
+            // current future is already completed and reused
+            return;
+        }
+        allResultFutures.remove(currentFuture);
+        currentFuture.close();
+
+        // Route through join pipeline via new JoinedRowResultFuture
+        JoinedRowResultFuture outResultFuture =
+                new JoinedRowResultFuture(
+                        resultFutureBuffer,
+                        createFetcherResultFuture(new Configuration()),
+                        fetcherConverter,
+                        isLeftOuterJoin,
+                        rightRowSerializer.getArity());
+        outResultFuture.inuse.set(true);
+        outResultFuture.reset(input, resultFuture);
+        allResultFutures.add(outResultFuture);

Review Comment:
   I fixed this by making the replacement future recycle on both the normal and 
exceptional completion paths.
   In `AsyncLookupJoinRunner`, the timeout replacement future is now returned 
to `resultFutureBuffer` from a shared recycle path, so we no longer leak buffer 
entries on exceptional timeout completion. I also added regression coverage to 
ensure repeated timeout exceptions do not exhaust the buffer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to