Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6091#discussion_r192319359
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
---
@@ -209,8 +208,7 @@ public void processElement(StreamRecord<IN> element)
throws Exception {
new ProcessingTimeCallback() {
@Override
public void onProcessingTime(long
timestamp) throws Exception {
-
streamRecordBufferEntry.completeExceptionally(
- new
TimeoutException("Async function call has timed out."));
+
userFunction.timeout(element.getValue(), streamRecordBufferEntry);
--- End diff --
I have one more (possibly major) worry regarding thread safety. Here we are
passing the `element.getValue()` object to a different thread. I wonder if this
could brake something.
`userFunction.asyncInvoke(element.getValue(), streamRecordBufferEntry);` at
least allows user to preprocess the element before handing it over to a
different thread.
CC @aljoscha ?
---