kezhuw commented on a change in pull request #14140:
URL: https://github.com/apache/flink/pull/14140#discussion_r527763762



##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
##########
@@ -401,20 +401,36 @@ public void waitForInputProcessing() throws Exception {
                        }
                }
 
-               // then wait for the Task Thread to be in a blocked state
-               // Check whether the state is blocked, this should be the case 
if it cannot
-               // notifyNonEmpty more input, i.e. all currently available 
input has been processed.
-               while (true) {
-                       Thread.State state = taskThread.getState();
-                       if (state == Thread.State.BLOCKED || state == 
Thread.State.TERMINATED ||
-                                       state == Thread.State.WAITING || state 
== Thread.State.TIMED_WAITING) {
+               // Wait for all currently available input has been processed.
+               final AtomicBoolean allInputProcessed = new AtomicBoolean();
+               final MailboxProcessor mailboxProcessor = 
taskThread.task.mailboxProcessor;
+               final MailboxExecutor mailboxExecutor = 
mailboxProcessor.getMainMailboxExecutor();
+               while (taskThread.isAlive()) {
+                       try {
+                               final CountDownLatch latch = new 
CountDownLatch(1);
+                               mailboxExecutor.execute(() -> {
+                                       
allInputProcessed.set(mailboxProcessor.isDefaultActionUnavailable());

Review comment:
       It may made wrong name for `allInputProcessed`, it should express all 
current available input has been processed, not end of input. What 
`StreamTaskTestHarness.waitForInputProcessing` does is waiting current 
available input processed, so that following up testing code could do 
post-process assertion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to