This is an automated email from the ASF dual-hosted git repository.

dmvk pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
     new c02099bcbfd [FLINK-31347] Prevent 
AdaptiveSchedulerClusterITCase.testAutomaticScaleUp timing out.
c02099bcbfd is described below

commit c02099bcbfd9ec32d1e915bd3efed7dfd23597b3
Author: David Moravek <[email protected]>
AuthorDate: Mon Mar 6 19:38:02 2023 +0100

    [FLINK-31347] Prevent AdaptiveSchedulerClusterITCase.testAutomaticScaleUp 
timing out.
    
    We're starting TMs with two slots, and in case both slots don't arrive 
simultaneously, we might have an additional restart of the job. Unfortunately 
calling OnceBlockingNoOpInvokable#cancel would effectively unblock all future 
#invoke calls by setting the running flag to true. We address the issue by 
tracking this flag for each execution attempt separately.
---
 .../testtasks/OnceBlockingNoOpInvokable.java       | 38 ++++++++++++----------
 1 file changed, 20 insertions(+), 18 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/OnceBlockingNoOpInvokable.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/OnceBlockingNoOpInvokable.java
index 2e8a57cd341..210d534d3a9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/OnceBlockingNoOpInvokable.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/OnceBlockingNoOpInvokable.java
@@ -19,7 +19,13 @@
 package org.apache.flink.runtime.testtasks;
 
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
 
 /**
  * Mimics a task that is doing something until some external condition is 
fulfilled. {@link
@@ -33,45 +39,41 @@ import 
org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
  */
 public class OnceBlockingNoOpInvokable extends AbstractInvokable {
 
-    private static volatile boolean isBlocking = true;
+    private static final Map<ExecutionAttemptID, CountDownLatch> 
EXECUTION_LATCHES =
+            new ConcurrentHashMap<>();
 
-    private static final Object lock = new Object();
+    private static volatile boolean isBlocking = true;
 
-    private static volatile boolean running = true;
+    private final ExecutionAttemptID executionAttemptId;
 
     public OnceBlockingNoOpInvokable(Environment environment) {
         super(environment);
+        this.executionAttemptId = environment.getExecutionId();
+        Preconditions.checkState(
+                EXECUTION_LATCHES.put(executionAttemptId, new 
CountDownLatch(1)) == null);
     }
 
     @Override
     public void invoke() throws Exception {
-        if (isBlocking) {
-            synchronized (lock) {
-                while (running) {
-                    lock.wait();
-                }
-            }
+        final CountDownLatch executionLatch =
+                
Preconditions.checkNotNull(EXECUTION_LATCHES.get(executionAttemptId));
+        while (isBlocking && executionLatch.getCount() > 0) {
+            executionLatch.await();
         }
     }
 
     @Override
     public void cancel() throws Exception {
-        synchronized (lock) {
-            running = false;
-            lock.notifyAll();
-        }
+        
Preconditions.checkNotNull(EXECUTION_LATCHES.get(executionAttemptId)).countDown();
     }
 
     public static void unblock() {
-        running = false;
         isBlocking = false;
-        synchronized (lock) {
-            lock.notifyAll();
-        }
+        EXECUTION_LATCHES.values().forEach(CountDownLatch::countDown);
     }
 
     public static void reset() {
         isBlocking = true;
-        running = true;
+        EXECUTION_LATCHES.clear();
     }
 }

Reply via email to