AHeise commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r329919625
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ##########
 @@ -293,140 +252,120 @@ public void close() throws Exception {
                        waitInFlightInputsFinished();
                }
                finally {
-                       Exception exception = null;
-
-                       try {
-                               super.close();
-                       } catch (InterruptedException interrupted) {
-                               exception = interrupted;
-
-                               Thread.currentThread().interrupt();
-                       } catch (Exception e) {
-                               exception = e;
-                       }
-
-                       try {
-                               // terminate the emitter, the emitter thread 
and the executor
-                               stopResources(true);
-                       } catch (InterruptedException interrupted) {
-                               exception = 
ExceptionUtils.firstOrSuppressed(interrupted, exception);
-
-                               Thread.currentThread().interrupt();
-                       } catch (Exception e) {
-                               exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
-                       }
-
-                       if (exception != null) {
-                               LOG.warn("Errors occurred while closing the 
AsyncWaitOperator.", exception);
-                       }
+                       super.close();
                }
        }
 
-       @Override
-       public void dispose() throws Exception {
-               Exception exception = null;
+       /**
+        * Add the given stream element to the operator's stream element queue. 
This operation blocks until the element
+        * has been added.
+        *
+        * <p>Between two insertion attempts, this method yields the execution 
to the mailbox, such that events as well
+        * as asynchronous results can be processed.
+        *
+        * @param streamElement to add to the operator's queue
+        * @throws InterruptedException if the current thread has been 
interrupted while yielding to mailbox
+        * @return a handle that allows to set the result of the async 
computation for the given element.
+        */
+       private ResultFuture<OUT> addToWorkQueue(StreamElement streamElement) 
throws InterruptedException {
+               assert(Thread.holdsLock(checkpointingLock));
 
-               try {
-                       super.dispose();
-               } catch (InterruptedException interrupted) {
-                       exception = interrupted;
+               pendingStreamElement = streamElement;
 
-                       Thread.currentThread().interrupt();
-               } catch (Exception e) {
-                       exception = e;
+               Optional<ResultFuture<OUT>> queueEntry;
+               while (!(queueEntry = queue.tryPut(streamElement)).isPresent()) 
{
+                       mailboxExecutor.yield();
                }
 
-               try {
-                       stopResources(false);
-               } catch (InterruptedException interrupted) {
-                       exception = 
ExceptionUtils.firstOrSuppressed(interrupted, exception);
+               pendingStreamElement = null;
 
-                       Thread.currentThread().interrupt();
-               } catch (Exception e) {
-                       exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
-               }
+               return queueEntry.get();
+       }
+
+       private void waitInFlightInputsFinished() throws InterruptedException {
+               assert(Thread.holdsLock(checkpointingLock));
 
-               if (exception != null) {
-                       throw exception;
+               while (!queue.isEmpty()) {
+                       mailboxExecutor.yield();
                }
        }
 
        /**
-        * Close the operator's resources. They include the emitter thread and 
the executor to run
-        * the queue's complete operation.
+        * Batch output of all completed elements. Watermarks are always 
completed if it's their turn to be processed.
         *
-        * @param waitForShutdown is true if the method should wait for the 
resources to be freed;
-        *                           otherwise false.
-        * @throws InterruptedException if current thread has been interrupted
+        * <p>This method will be called from {@link 
#processWatermark(Watermark)} and from a mail processing the result
+        * of an async function call.
         */
-       private void stopResources(boolean waitForShutdown) throws 
InterruptedException {
-               emitter.stop();
-               emitterThread.interrupt();
-
-               executor.shutdown();
-
-               if (waitForShutdown) {
-                       try {
-                               if (!executor.awaitTermination(365L, 
TimeUnit.DAYS)) {
-                                       executor.shutdownNow();
-                               }
-                       } catch (InterruptedException e) {
-                               executor.shutdownNow();
-
-                               Thread.currentThread().interrupt();
+       private void outputCompletedElements() {
+               if (queue.hasCompletedElements()) {
+                       // emit one element
+                       synchronized (checkpointingLock) {
+                               
queue.emitCompletedElement(this.timestampedCollector);
                        }
-
-                       /*
-                        * FLINK-5638: If we have the checkpoint lock we might 
have to free it for a while so
-                        * that the emitter thread can complete/react to the 
interrupt signal.
-                        */
-                       if (Thread.holdsLock(checkpointingLock)) {
-                               while (emitterThread.isAlive()) {
-                                       checkpointingLock.wait(100L);
-                               }
+                       // if there are more completed elements, emit them with 
subsequent mails
+                       if (queue.hasCompletedElements()) {
 
 Review comment:
   Unfortunately, it would not work completely.
   
   Let's say we have two items i1, i2 in an ordered queue and they complete in 
reverse order. Then the async callback of i2 will not emit any element. The 
async callback of i2 will emit only the first element i1. So, i2 is stuck. Of 
course, it will be emitted as soon as any later item completed, but we have an 
artificial lag and for bounded inputs, we would actually never emit the last 
few elements.
   
   Originally, `emitCompletedElement` emitted all elements, so we wouldn't need 
to enqueue an additional mail. However, that may block the mailbox for a longer 
time, such that interleaved events will not be processed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to