kezhuw commented on a change in pull request #14140:
URL: https://github.com/apache/flink/pull/14140#discussion_r527781089



##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
##########
@@ -401,20 +401,36 @@ public void waitForInputProcessing() throws Exception {
                        }
                }
 
-               // then wait for the Task Thread to be in a blocked state
-               // Check whether the state is blocked, this should be the case 
if it cannot
-               // notifyNonEmpty more input, i.e. all currently available 
input has been processed.
-               while (true) {
-                       Thread.State state = taskThread.getState();
-                       if (state == Thread.State.BLOCKED || state == 
Thread.State.TERMINATED ||
-                                       state == Thread.State.WAITING || state 
== Thread.State.TIMED_WAITING) {
+               // Wait for all currently available input has been processed.
+               final AtomicBoolean allInputProcessed = new AtomicBoolean();
+               final MailboxProcessor mailboxProcessor = 
taskThread.task.mailboxProcessor;
+               final MailboxExecutor mailboxExecutor = 
mailboxProcessor.getMainMailboxExecutor();
+               while (taskThread.isAlive()) {
+                       try {
+                               final CountDownLatch latch = new 
CountDownLatch(1);
+                               mailboxExecutor.execute(() -> {
+                                       
allInputProcessed.set(mailboxProcessor.isDefaultActionUnavailable());
+                                       latch.countDown();
+                               }, 
"query-whether-processInput-has-suspend-itself");
+                               // Mail could be dropped due to task exception, 
so we do timed-await here.
+                               latch.await(1, TimeUnit.SECONDS);

Review comment:
       This `await` has two purposes here:
   1. Wait until post mail has been processed, so we can query 
`allInputProcessed` safely.
   2. If post mail has been dropped due to task exception, break out indefinite 
wait.
   
   It does not serve as sleeping to yield control to mailbox thread. Without 
`sleep`, testing thread and mailbox thread may do ping-pong game between 
process-one-element and execute-one-mail.
   
   I tend to keep it, it does not affect correctness at least.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to