Github user kisimple commented on a diff in the pull request: https://github.com/apache/flink/pull/6091#discussion_r192329480 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java --- @@ -209,8 +208,7 @@ public void processElement(StreamRecord<IN> element) throws Exception { new ProcessingTimeCallback() { @Override public void onProcessingTime(long timestamp) throws Exception { - streamRecordBufferEntry.completeExceptionally( - new TimeoutException("Async function call has timed out.")); + userFunction.timeout(element.getValue(), streamRecordBufferEntry); --- End diff -- Highly appreciate all the reviews :) I have pushed the fixup.
---