AHeise commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328133817
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ##########
 @@ -209,41 +176,34 @@ else if (element.isLatencyMarker()) {
        }
 
        @Override
-       public void processElement(StreamRecord<IN> element) throws Exception {
-               final StreamRecordQueueEntry<OUT> streamRecordBufferEntry = new 
StreamRecordQueueEntry<>(element);
+       public void processElement(final StreamRecord<IN> element) throws 
Exception {
+               // add element first to the queue
+               final ResultFuture<OUT> entry = addToWorkQueue(element);
+
+               final ResultHandler resultHandler = new ResultHandler(element, 
entry);
 
+               // register a timeout for the entry if timeout is configured
                if (timeout > 0L) {
-                       // register a timeout for this 
AsyncStreamRecordBufferEntry
-                       long timeoutTimestamp = timeout + 
getProcessingTimeService().getCurrentProcessingTime();
-
-                       final ScheduledFuture<?> timerFuture = 
getProcessingTimeService().registerTimer(
-                               timeoutTimestamp,
-                               new ProcessingTimeCallback() {
-                                       @Override
-                                       public void onProcessingTime(long 
timestamp) throws Exception {
-                                               
userFunction.timeout(element.getValue(), streamRecordBufferEntry);
-                                       }
-                               });
-
-                       // Cancel the timer once we've completed the stream 
record buffer entry. This will remove
-                       // the register trigger task
-                       streamRecordBufferEntry.onComplete(
-                               (StreamElementQueueEntry<Collection<OUT>> 
value) -> {
-                                       timerFuture.cancel(true);
-                               },
-                               executor);
-               }
+                       final long timeoutTimestamp = timeout + 
getProcessingTimeService().getCurrentProcessingTime();
 
-               addAsyncBufferEntry(streamRecordBufferEntry);
+                       final ScheduledFuture<?> timeoutTimer = 
getProcessingTimeService().registerTimer(
+                                       timeoutTimestamp,
+                                       timestamp -> 
userFunction.timeout(element.getValue(), resultHandler));
 
-               userFunction.asyncInvoke(element.getValue(), 
streamRecordBufferEntry);
+                       resultHandler.setTimeoutTimer(timeoutTimer);
 
 Review comment:
   Hm that's definitively not what I expected from timers and I'm pretty sure 
it's hard for end-users to understand. My basic idea is: if I cancel a timer 
from 1 thread, then it will not execute in the same thread.
   
   I would suggest hotfixing that in general by adding a safeguard at the start 
of the timer letter. WDYT? @1u0 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to