This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ac6317f  [FLINK-21963] Harden ReactiveModeITCase
ac6317f is described below

commit ac6317fd5bcaf90a88dee74e0171a55933395907
Author: Robert Metzger <[email protected]>
AuthorDate: Mon Mar 29 15:26:50 2021 +0200

    [FLINK-21963] Harden ReactiveModeITCase
    
    This commit fixes a test instability in 
ReactiveModeITCase.testScaleDownOnTaskManagerLoss() where a CountDownLatch was 
counted down on an execution of the test source that got cancelled for a scale 
up event (despite its name, a scale up can happen in the test as well). This 
test instability only happens if there is enough time between the notification 
of new slots so that the source can start in between. On fast machines, the 
execution gets cancelled during scheduling, because the slo [...]
    
    With this commit, we are not using a latch, but counting the number of 
running instances explicitly.
    
    This closes #15417.
---
 .../flink/test/scheduling/ReactiveModeITCase.java  | 177 ++++++++++++---------
 1 file changed, 102 insertions(+), 75 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java
index 5dd2a64..d0ca6e0 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java
@@ -29,20 +29,16 @@ import 
org.apache.flink.runtime.testutils.MiniClusterResource;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
-import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 
-import javax.annotation.concurrent.GuardedBy;
-
 import java.time.Duration;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 
 import static org.junit.Assume.assumeTrue;
@@ -86,71 +82,52 @@ public class ReactiveModeITCase extends TestLogger {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         // we set maxParallelism = 1 and assert it never exceeds it
         final DataStream<String> input =
-                env.addSource(new 
ParallelismTrackingSource()).setMaxParallelism(1);
-        input.addSink(new 
ParallelismTrackingSink<>()).getTransformation().setMaxParallelism(1);
-
-        ParallelismTrackingSource.expectInstances(1);
-        ParallelismTrackingSink.expectInstances(1);
+                env.addSource(new 
FailOnParallelExecutionSource()).setMaxParallelism(1);
+        input.addSink(new DiscardingSink<>());
 
         env.executeAsync();
 
-        ParallelismTrackingSource.waitForInstances();
-        ParallelismTrackingSink.waitForInstances();
+        FailOnParallelExecutionSource.waitForScaleUpToParallelism(1);
     }
 
     /** Test that a job scales up when a TaskManager gets added to the 
cluster. */
     @Test
     public void testScaleUpOnAdditionalTaskManager() throws Exception {
+        ParallelismTrackingSource.resetParallelismTracker();
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         final DataStream<String> input = env.addSource(new 
ParallelismTrackingSource());
-        input.addSink(new ParallelismTrackingSink<>());
-
-        ParallelismTrackingSource.expectInstances(
-                NUMBER_SLOTS_PER_TASK_MANAGER * INITIAL_NUMBER_TASK_MANAGERS);
-        ParallelismTrackingSink.expectInstances(
-                NUMBER_SLOTS_PER_TASK_MANAGER * INITIAL_NUMBER_TASK_MANAGERS);
+        input.addSink(new DiscardingSink<>());
 
         env.executeAsync();
 
-        ParallelismTrackingSource.waitForInstances();
-        ParallelismTrackingSink.waitForInstances();
-
-        // expect scale up to 2 TaskManagers:
-        
ParallelismTrackingSource.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER * 2);
-        ParallelismTrackingSink.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER 
* 2);
+        ParallelismTrackingSource.waitForScaleUpToParallelism(
+                NUMBER_SLOTS_PER_TASK_MANAGER * INITIAL_NUMBER_TASK_MANAGERS);
 
+        // scale up to 2 TaskManagers:
         miniClusterResource.getMiniCluster().startTaskManager();
-
-        ParallelismTrackingSource.waitForInstances();
-        ParallelismTrackingSink.waitForInstances();
+        
ParallelismTrackingSource.waitForScaleUpToParallelism(NUMBER_SLOTS_PER_TASK_MANAGER
 * 2);
     }
 
     @Test
     public void testScaleDownOnTaskManagerLoss() throws Exception {
+        ParallelismTrackingSource.resetParallelismTracker();
         // test preparation: ensure we have 2 TaskManagers running
         startAdditionalTaskManager();
 
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
0L));
+        // configure exactly one restart to avoid restart loops in error cases
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
         final DataStream<String> input = env.addSource(new 
ParallelismTrackingSource());
-        input.addSink(new ParallelismTrackingSink<>());
-
-        
ParallelismTrackingSource.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER * 2);
-        ParallelismTrackingSink.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER 
* 2);
+        input.addSink(new DiscardingSink<>());
 
         env.executeAsync();
 
-        ParallelismTrackingSource.waitForInstances();
-        ParallelismTrackingSink.waitForInstances();
+        
ParallelismTrackingSource.waitForScaleUpToParallelism(NUMBER_SLOTS_PER_TASK_MANAGER
 * 2);
 
         // scale down to 1 TaskManagers:
-        
ParallelismTrackingSource.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER);
-        ParallelismTrackingSink.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER);
-
         miniClusterResource.getMiniCluster().terminateTaskManager(0).get();
 
-        ParallelismTrackingSource.waitForInstances();
-        ParallelismTrackingSink.waitForInstances();
+        
ParallelismTrackingSource.waitForScaleUpToParallelism(NUMBER_SLOTS_PER_TASK_MANAGER);
     }
 
     private int getNumberOfConnectedTaskManagers() throws ExecutionException, 
InterruptedException {
@@ -168,27 +145,41 @@ public class ReactiveModeITCase extends TestLogger {
                 Deadline.fromNow(Duration.ofMillis(10_000L)));
     }
 
-    private static class ParallelismTrackingSource implements 
ParallelSourceFunction<String> {
+    /**
+     * This source is tracking its parallelism internally. We can not use a 
CountDownLatch with a
+     * predefined parallelism. When scheduling this source on more than one 
TaskManager in Reactive
+     * Mode, it can happen that the source gets scheduled once the first 
TaskManager registers. In
+     * this execution, the source would count down the latch by one already, 
but Reactive Mode would
+     * trigger a restart once the next TaskManager arrives, ultimately 
breaking the count of the
+     * latch.
+     *
+     * <p>This approach is a compromise that just tracks the number of running 
instances and allows
+     * the test to wait for a parallelism to be reached. To avoid accidentally 
reaching the scale
+     * while deallocating source instances, the {@link 
InstanceParallelismTracker} is only notifying
+     * the wait method when new instances are added, not when they are removed.
+     */
+    private static class ParallelismTrackingSource extends 
RichParallelSourceFunction<String> {
         private volatile boolean running = true;
 
-        private static final InstanceTracker instances = new InstanceTracker();
+        private static final InstanceParallelismTracker tracker = new 
InstanceParallelismTracker();
 
-        public static void expectInstances(int count) {
-            instances.expectInstances(count);
+        public static void waitForScaleUpToParallelism(int parallelism)
+                throws InterruptedException {
+            tracker.waitForScaleUpToParallelism(parallelism);
         }
 
-        public static void waitForInstances() throws InterruptedException {
-            instances.waitForInstances();
+        public static void resetParallelismTracker() {
+            tracker.reset();
         }
 
         @Override
         public void run(SourceContext<String> ctx) throws Exception {
-            instances.reportNewInstance();
+            tracker.reportNewInstance();
             while (running) {
                 synchronized (ctx.getCheckpointLock()) {
                     ctx.collect("test");
                 }
-                Thread.sleep(100);
+                Thread.sleep(10);
             }
         }
 
@@ -196,52 +187,88 @@ public class ReactiveModeITCase extends TestLogger {
         public void cancel() {
             running = false;
         }
+
+        @Override
+        public void close() throws Exception {
+            tracker.reportStoppedInstance();
+        }
     }
 
-    private static class ParallelismTrackingSink<T> extends 
RichSinkFunction<T> {
+    private static class InstanceParallelismTracker {
+        // only notify this lock on scale-up
+        private final Object lock = new Object();
 
-        private static final InstanceTracker instances = new InstanceTracker();
+        private int instances = 0;
 
-        public static void expectInstances(int count) {
-            instances.expectInstances(count);
+        public void reportStoppedInstance() {
+            synchronized (lock) {
+                instances--;
+            }
         }
 
-        public static void waitForInstances() throws InterruptedException {
-            instances.waitForInstances();
+        public void reportNewInstance() {
+            synchronized (lock) {
+                instances++;
+                lock.notifyAll();
+            }
         }
 
-        @Override
-        public void open(Configuration parameters) throws Exception {
-            instances.reportNewInstance();
+        public void waitForScaleUpToParallelism(int parallelism) throws 
InterruptedException {
+            synchronized (lock) {
+                while (instances != parallelism) {
+                    lock.wait();
+                }
+            }
+        }
+
+        public void reset() {
+            synchronized (lock) {
+                instances = 0;
+            }
         }
     }
 
-    private static class InstanceTracker {
-        private final Object lock = new Object();
+    private static class FailOnParallelExecutionSource extends 
RichParallelSourceFunction<String> {
+        private volatile boolean running = true;
 
-        @GuardedBy("lock")
-        private CountDownLatch latch = new CountDownLatch(0);
+        private static final InstanceParallelismTracker tracker = new 
InstanceParallelismTracker();
 
-        public void reportNewInstance() {
-            synchronized (lock) {
-                if (latch.getCount() == 0) {
-                    throw new RuntimeException("Test error. More instances 
than expected.");
+        public static void waitForScaleUpToParallelism(int parallelism)
+                throws InterruptedException {
+            tracker.waitForScaleUpToParallelism(parallelism);
+        }
+
+        public static void resetParallelismTracker() {
+            tracker.reset();
+        }
+
+        @Override
+        public void open(Configuration parameters) throws Exception {
+            if (getRuntimeContext().getNumberOfParallelSubtasks() > 1) {
+                throw new IllegalStateException(
+                        "This is not supposed to be executed in parallel, 
despite extending the right base class.");
+            }
+        }
+
+        @Override
+        public void run(SourceContext<String> ctx) throws Exception {
+            tracker.reportNewInstance();
+            while (running) {
+                synchronized (ctx.getCheckpointLock()) {
+                    ctx.collect("test");
                 }
-                latch.countDown();
+                Thread.sleep(100);
             }
         }
 
-        public void waitForInstances() throws InterruptedException {
-            //noinspection FieldAccessNotGuarded
-            latch.await();
+        @Override
+        public void cancel() {
+            running = false;
         }
 
-        public void expectInstances(int count) {
-            synchronized (lock) {
-                Preconditions.checkState(
-                        latch.getCount() == 0, "Assuming previous latch has 
triggered");
-                latch = new CountDownLatch(count);
-            }
+        @Override
+        public void close() throws Exception {
+            tracker.reportStoppedInstance();
         }
     }
 }

Reply via email to