rkhachatryan commented on a change in pull request #14140:
URL: https://github.com/apache/flink/pull/14140#discussion_r527746847



##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
##########
@@ -401,20 +401,36 @@ public void waitForInputProcessing() throws Exception {
                        }
                }
 
-               // then wait for the Task Thread to be in a blocked state
-               // Check whether the state is blocked, this should be the case 
if it cannot
-               // notifyNonEmpty more input, i.e. all currently available 
input has been processed.
-               while (true) {
-                       Thread.State state = taskThread.getState();
-                       if (state == Thread.State.BLOCKED || state == 
Thread.State.TERMINATED ||
-                                       state == Thread.State.WAITING || state 
== Thread.State.TIMED_WAITING) {
+               // Wait for all currently available input has been processed.
+               final AtomicBoolean allInputProcessed = new AtomicBoolean();
+               final MailboxProcessor mailboxProcessor = 
taskThread.task.mailboxProcessor;
+               final MailboxExecutor mailboxExecutor = 
mailboxProcessor.getMainMailboxExecutor();
+               while (taskThread.isAlive()) {
+                       try {
+                               final CountDownLatch latch = new 
CountDownLatch(1);
+                               mailboxExecutor.execute(() -> {
+                                       
allInputProcessed.set(mailboxProcessor.isDefaultActionUnavailable());

Review comment:
       I think `isDefaultActionUnavailable()` is not the best choice here 
because suspension is a temporary state; some input may come after this check.
   
   What about using `mailboxProcessor.isMailboxLoopRunning()` instead? 
   It is updated on `InputStatus.END_OF_INPUT` which seems exactly what is 
needed here.

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
##########
@@ -401,20 +401,36 @@ public void waitForInputProcessing() throws Exception {
                        }
                }
 
-               // then wait for the Task Thread to be in a blocked state
-               // Check whether the state is blocked, this should be the case 
if it cannot
-               // notifyNonEmpty more input, i.e. all currently available 
input has been processed.
-               while (true) {
-                       Thread.State state = taskThread.getState();
-                       if (state == Thread.State.BLOCKED || state == 
Thread.State.TERMINATED ||
-                                       state == Thread.State.WAITING || state 
== Thread.State.TIMED_WAITING) {
+               // Wait for all currently available input has been processed.
+               final AtomicBoolean allInputProcessed = new AtomicBoolean();
+               final MailboxProcessor mailboxProcessor = 
taskThread.task.mailboxProcessor;
+               final MailboxExecutor mailboxExecutor = 
mailboxProcessor.getMainMailboxExecutor();
+               while (taskThread.isAlive()) {
+                       try {
+                               final CountDownLatch latch = new 
CountDownLatch(1);
+                               mailboxExecutor.execute(() -> {
+                                       
allInputProcessed.set(mailboxProcessor.isDefaultActionUnavailable());
+                                       latch.countDown();
+                               }, 
"query-whether-processInput-has-suspend-itself");
+                               // Mail could be dropped due to task exception, 
so we do timed-await here.
+                               latch.await(1, TimeUnit.SECONDS);

Review comment:
       With this `await`, is the `sleep` below still necessary?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to