This is an automated email from the ASF dual-hosted git repository.
dmvk pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.17 by this push:
new c02099bcbfd [FLINK-31347] Prevent
AdaptiveSchedulerClusterITCase.testAutomaticScaleUp timing out.
c02099bcbfd is described below
commit c02099bcbfd9ec32d1e915bd3efed7dfd23597b3
Author: David Moravek <[email protected]>
AuthorDate: Mon Mar 6 19:38:02 2023 +0100
[FLINK-31347] Prevent AdaptiveSchedulerClusterITCase.testAutomaticScaleUp
timing out.
We're starting TMs with two slots, and in case both slots don't arrive
simultaneously, we might have an additional restart of the job. Unfortunately
calling OnceBlockingNoOpInvokable#cancel would effectively unblock all future
#invoke calls by setting the running flag to true. We address the issue by
tracking this flag for each execution attempt separately.
---
.../testtasks/OnceBlockingNoOpInvokable.java | 38 ++++++++++++----------
1 file changed, 20 insertions(+), 18 deletions(-)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/OnceBlockingNoOpInvokable.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/OnceBlockingNoOpInvokable.java
index 2e8a57cd341..210d534d3a9 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/OnceBlockingNoOpInvokable.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/OnceBlockingNoOpInvokable.java
@@ -19,7 +19,13 @@
package org.apache.flink.runtime.testtasks;
import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
/**
* Mimics a task that is doing something until some external condition is
fulfilled. {@link
@@ -33,45 +39,41 @@ import
org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
*/
public class OnceBlockingNoOpInvokable extends AbstractInvokable {
- private static volatile boolean isBlocking = true;
+ private static final Map<ExecutionAttemptID, CountDownLatch>
EXECUTION_LATCHES =
+ new ConcurrentHashMap<>();
- private static final Object lock = new Object();
+ private static volatile boolean isBlocking = true;
- private static volatile boolean running = true;
+ private final ExecutionAttemptID executionAttemptId;
public OnceBlockingNoOpInvokable(Environment environment) {
super(environment);
+ this.executionAttemptId = environment.getExecutionId();
+ Preconditions.checkState(
+ EXECUTION_LATCHES.put(executionAttemptId, new
CountDownLatch(1)) == null);
}
@Override
public void invoke() throws Exception {
- if (isBlocking) {
- synchronized (lock) {
- while (running) {
- lock.wait();
- }
- }
+ final CountDownLatch executionLatch =
+
Preconditions.checkNotNull(EXECUTION_LATCHES.get(executionAttemptId));
+ while (isBlocking && executionLatch.getCount() > 0) {
+ executionLatch.await();
}
}
@Override
public void cancel() throws Exception {
- synchronized (lock) {
- running = false;
- lock.notifyAll();
- }
+
Preconditions.checkNotNull(EXECUTION_LATCHES.get(executionAttemptId)).countDown();
}
public static void unblock() {
- running = false;
isBlocking = false;
- synchronized (lock) {
- lock.notifyAll();
- }
+ EXECUTION_LATCHES.values().forEach(CountDownLatch::countDown);
}
public static void reset() {
isBlocking = true;
- running = true;
+ EXECUTION_LATCHES.clear();
}
}