dmvk commented on a change in pull request #16702:
URL: https://github.com/apache/flink/pull/16702#discussion_r683544350
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionVertexOperationsDecorator.java
##########
@@ -82,14 +82,52 @@ public void disableFailDeploy() {
}
public List<ExecutionVertexID> getDeployedVertices() {
- return Collections.unmodifiableList(deployedVertices);
+ return deployedVertices.getVertices();
}
public List<ExecutionVertexID> getCanceledVertices() {
- return Collections.unmodifiableList(canceledVertices);
+ return canceledVertices.getVertices();
}
public List<ExecutionVertexID> getFailedVertices() {
- return Collections.unmodifiableList(failedVertices);
+ return failedVertices.getVertices();
+ }
+
+ /** Waits until the given number of vertices have been canceled. */
+ public void awaitCanceledVertices(int count) throws InterruptedException {
+ canceledVertices.await(count);
+ }
+
+ /** Waits until the given number of vertices have been failed. */
+ public void awaitFailedVertices(int count) throws InterruptedException {
+ failedVertices.await(count);
+ }
+
+ private static class CountLatch {
+ @GuardedBy("lock")
+ private final List<ExecutionVertexID> vertices = new ArrayList<>();
+
+ private final Object lock = new Object();
+
+ public void add(ExecutionVertexID executionVertexId) {
+ synchronized (lock) {
+ vertices.add(executionVertexId);
+ lock.notifyAll();
+ }
+ }
+
+ public void await(int count) throws InterruptedException {
+ synchronized (lock) {
+ while (vertices.size() != count) {
Review comment:
```suggestion
while (vertices.size() < count) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]