tillrohrmann commented on a change in pull request #15417:
URL: https://github.com/apache/flink/pull/15417#discussion_r605641290



##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java
##########
@@ -196,52 +192,91 @@ public void run(SourceContext<String> ctx) throws 
Exception {
         public void cancel() {
             running = false;
         }
+
+        @Override
+        public void close() throws Exception {
+            tracker.reportStoppedInstance();
+        }
     }
 
-    private static class ParallelismTrackingSink<T> extends 
RichSinkFunction<T> {
+    private static class InstanceParallelismTracker {
+        // only notify this lock on scale-up
+        private final Object lock = new Object();
 
-        private static final InstanceTracker instances = new InstanceTracker();
+        private volatile int instances = 0;
 
-        public static void expectInstances(int count) {
-            instances.expectInstances(count);
+        public void reportStoppedInstance() {
+            synchronized (lock) {
+                instances--;
+            }
         }
 
-        public static void waitForInstances() throws InterruptedException {
-            instances.waitForInstances();
+        public void reportNewInstance() {
+            synchronized (lock) {
+                instances++;
+                lock.notifyAll();
+            }
         }
 
-        @Override
-        public void open(Configuration parameters) throws Exception {
-            instances.reportNewInstance();
+        public void waitForScaleUpToParallelism(int parallelism) throws 
InterruptedException {
+            while (true) {
+                synchronized (lock) {
+                    if (instances == parallelism) {
+                        break;
+                    }
+                    lock.wait();
+                }
+            }
+        }
+
+        public void reset() {
+            synchronized (lock) {
+                instances = 0;
+            }
         }
     }
 
-    private static class InstanceTracker {
-        private final Object lock = new Object();
+    private static class FailOnParallelExecutionSource extends 
RichParallelSourceFunction<String> {
+        private volatile boolean running = true;
 
-        @GuardedBy("lock")
-        private CountDownLatch latch = new CountDownLatch(0);
+        private static final InstanceParallelismTracker tracker = new 
InstanceParallelismTracker();
 
-        public void reportNewInstance() {
-            synchronized (lock) {
-                if (latch.getCount() == 0) {
-                    throw new RuntimeException("Test error. More instances 
than expected.");
+        public static void waitForScaleUpToParallelism(int parallelism)
+                throws InterruptedException {
+            tracker.waitForScaleUpToParallelism(parallelism);
+        }
+
+        public static void resetParallelismTracker() {
+            tracker.reset();
+        }
+
+        @Override
+        public void open(Configuration parameters) throws Exception {
+            if (getRuntimeContext().getNumberOfParallelSubtasks() > 1) {
+                throw new IllegalStateException(
+                        "This is not supposed to be executed in parallel, 
despite extending the right base class.");
+            }
+        }
+
+        @Override
+        public void run(SourceContext<String> ctx) throws Exception {
+            tracker.reportNewInstance();
+            while (running) {
+                synchronized (ctx.getCheckpointLock()) {
+                    ctx.collect("test");
                 }
-                latch.countDown();
+                Thread.sleep(100);

Review comment:
       Can this be decreased to `10` or `20`? Otherwise the test will run at 
least 100ms.

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java
##########
@@ -86,71 +82,57 @@ public void testScaleLimitByMaxParallelism() throws 
Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         // we set maxParallelism = 1 and assert it never exceeds it
         final DataStream<String> input =
-                env.addSource(new 
ParallelismTrackingSource()).setMaxParallelism(1);
-        input.addSink(new 
ParallelismTrackingSink<>()).getTransformation().setMaxParallelism(1);
-
-        ParallelismTrackingSource.expectInstances(1);
-        ParallelismTrackingSink.expectInstances(1);
+                env.addSource(new 
FailOnParallelExecutionSource()).setMaxParallelism(1);
+        input.addSink(new DiscardingSink<>());
 
         env.executeAsync();
 
-        ParallelismTrackingSource.waitForInstances();
-        ParallelismTrackingSink.waitForInstances();
+        FailOnParallelExecutionSource.waitForScaleUpToParallelism(1);
     }
 
     /** Test that a job scales up when a TaskManager gets added to the 
cluster. */
     @Test
     public void testScaleUpOnAdditionalTaskManager() throws Exception {
+        ParallelismTrackingSource.resetParallelismTracker();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         final DataStream<String> input = env.addSource(new 
ParallelismTrackingSource());
-        input.addSink(new ParallelismTrackingSink<>());
-
-        ParallelismTrackingSource.expectInstances(
-                NUMBER_SLOTS_PER_TASK_MANAGER * INITIAL_NUMBER_TASK_MANAGERS);
-        ParallelismTrackingSink.expectInstances(
-                NUMBER_SLOTS_PER_TASK_MANAGER * INITIAL_NUMBER_TASK_MANAGERS);
+        input.addSink(new DiscardingSink<>());
 
         env.executeAsync();
 
-        ParallelismTrackingSource.waitForInstances();
-        ParallelismTrackingSink.waitForInstances();
-
-        // expect scale up to 2 TaskManagers:
-        
ParallelismTrackingSource.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER * 2);
-        ParallelismTrackingSink.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER 
* 2);
+        ParallelismTrackingSource.waitForScaleUpToParallelism(
+                NUMBER_SLOTS_PER_TASK_MANAGER * INITIAL_NUMBER_TASK_MANAGERS);
 
+        // scale up to 2 TaskManagers:
         miniClusterResource.getMiniCluster().startTaskManager();
-
-        ParallelismTrackingSource.waitForInstances();
-        ParallelismTrackingSink.waitForInstances();
+        
ParallelismTrackingSource.waitForScaleUpToParallelism(NUMBER_SLOTS_PER_TASK_MANAGER
 * 2);
     }
 
     @Test
     public void testScaleDownOnTaskManagerLoss() throws Exception {
+        ParallelismTrackingSource.resetParallelismTracker();
         // test preparation: ensure we have 2 TaskManagers running
         startAdditionalTaskManager();
+        log.info("2 TaskManagers running, submitting job.");

Review comment:
       Is this logging statement needed?

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java
##########
@@ -196,52 +192,91 @@ public void run(SourceContext<String> ctx) throws 
Exception {
         public void cancel() {
             running = false;
         }
+
+        @Override
+        public void close() throws Exception {
+            tracker.reportStoppedInstance();
+        }
     }
 
-    private static class ParallelismTrackingSink<T> extends 
RichSinkFunction<T> {
+    private static class InstanceParallelismTracker {
+        // only notify this lock on scale-up
+        private final Object lock = new Object();
 
-        private static final InstanceTracker instances = new InstanceTracker();
+        private volatile int instances = 0;

Review comment:
       Why is this field volatile?

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java
##########
@@ -196,52 +192,91 @@ public void run(SourceContext<String> ctx) throws 
Exception {
         public void cancel() {
             running = false;
         }
+
+        @Override
+        public void close() throws Exception {
+            tracker.reportStoppedInstance();
+        }
     }
 
-    private static class ParallelismTrackingSink<T> extends 
RichSinkFunction<T> {
+    private static class InstanceParallelismTracker {
+        // only notify this lock on scale-up
+        private final Object lock = new Object();
 
-        private static final InstanceTracker instances = new InstanceTracker();
+        private volatile int instances = 0;
 
-        public static void expectInstances(int count) {
-            instances.expectInstances(count);
+        public void reportStoppedInstance() {
+            synchronized (lock) {
+                instances--;
+            }
         }
 
-        public static void waitForInstances() throws InterruptedException {
-            instances.waitForInstances();
+        public void reportNewInstance() {
+            synchronized (lock) {
+                instances++;
+                lock.notifyAll();
+            }
         }
 
-        @Override
-        public void open(Configuration parameters) throws Exception {
-            instances.reportNewInstance();
+        public void waitForScaleUpToParallelism(int parallelism) throws 
InterruptedException {
+            while (true) {
+                synchronized (lock) {
+                    if (instances == parallelism) {
+                        break;
+                    }
+                    lock.wait();
+                }
+            }

Review comment:
       If I am not mistaken, then you would write the following loop:
   
   ```
   synchronized (lock) {
     while (instances != parallelism) {
       lock.wait();
     }
   }




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to