rmetzger commented on a change in pull request #15417:
URL: https://github.com/apache/flink/pull/15417#discussion_r606151837



##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java
##########
@@ -196,52 +192,91 @@ public void run(SourceContext<String> ctx) throws 
Exception {
         public void cancel() {
             running = false;
         }
+
+        @Override
+        public void close() throws Exception {
+            tracker.reportStoppedInstance();
+        }
     }
 
-    private static class ParallelismTrackingSink<T> extends 
RichSinkFunction<T> {
+    private static class InstanceParallelismTracker {
+        // only notify this lock on scale-up
+        private final Object lock = new Object();
 
-        private static final InstanceTracker instances = new InstanceTracker();
+        private volatile int instances = 0;
 
-        public static void expectInstances(int count) {
-            instances.expectInstances(count);
+        public void reportStoppedInstance() {
+            synchronized (lock) {
+                instances--;
+            }
         }
 
-        public static void waitForInstances() throws InterruptedException {
-            instances.waitForInstances();
+        public void reportNewInstance() {
+            synchronized (lock) {
+                instances++;
+                lock.notifyAll();
+            }
         }
 
-        @Override
-        public void open(Configuration parameters) throws Exception {
-            instances.reportNewInstance();
+        public void waitForScaleUpToParallelism(int parallelism) throws 
InterruptedException {
+            while (true) {
+                synchronized (lock) {
+                    if (instances == parallelism) {
+                        break;
+                    }
+                    lock.wait();
+                }
+            }

Review comment:
       True, in this case we don't need to release the lock while waiting.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to