Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6091#discussion_r192319359
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ---
    @@ -209,8 +208,7 @@ public void processElement(StreamRecord<IN> element) 
throws Exception {
                                new ProcessingTimeCallback() {
                                        @Override
                                        public void onProcessingTime(long 
timestamp) throws Exception {
    -                                           
streamRecordBufferEntry.completeExceptionally(
    -                                                   new 
TimeoutException("Async function call has timed out."));
    +                                           
userFunction.timeout(element.getValue(), streamRecordBufferEntry);
    --- End diff --
    
    I have one more (possibly major) worry regarding thread safety. Here we are 
passing the `element.getValue()` object to a different thread. I wonder if this 
could brake something.
    
    `userFunction.asyncInvoke(element.getValue(), streamRecordBufferEntry);` at 
least allows user to preprocess the element before handing it over to a 
different thread.
    
    CC @aljoscha ?


---

Reply via email to