This is an automated email from the ASF dual-hosted git repository.
rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ac6317f [FLINK-21963] Harden ReactiveModeITCase
ac6317f is described below
commit ac6317fd5bcaf90a88dee74e0171a55933395907
Author: Robert Metzger <[email protected]>
AuthorDate: Mon Mar 29 15:26:50 2021 +0200
[FLINK-21963] Harden ReactiveModeITCase
This commit fixes a test instability in
ReactiveModeITCase.testScaleDownOnTaskManagerLoss() where a CountDownLatch was
counted down on an execution of the test source that got cancelled for a scale
up event (despite its name, a scale up can happen in the test as well). This
test instability only happens if there is enough time between the notification
of new slots so that the source can start in between. On fast machines, the
execution gets cancelled during scheduling, because the slo [...]
With this commit, we are not using a latch, but counting the number of
running instances explicitly.
This closes #15417.
---
.../flink/test/scheduling/ReactiveModeITCase.java | 177 ++++++++++++---------
1 file changed, 102 insertions(+), 75 deletions(-)
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java
index 5dd2a64..d0ca6e0 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java
@@ -29,20 +29,16 @@ import
org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
-import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
-import javax.annotation.concurrent.GuardedBy;
-
import java.time.Duration;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import static org.junit.Assume.assumeTrue;
@@ -86,71 +82,52 @@ public class ReactiveModeITCase extends TestLogger {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// we set maxParallelism = 1 and assert it never exceeds it
final DataStream<String> input =
- env.addSource(new
ParallelismTrackingSource()).setMaxParallelism(1);
- input.addSink(new
ParallelismTrackingSink<>()).getTransformation().setMaxParallelism(1);
-
- ParallelismTrackingSource.expectInstances(1);
- ParallelismTrackingSink.expectInstances(1);
+ env.addSource(new
FailOnParallelExecutionSource()).setMaxParallelism(1);
+ input.addSink(new DiscardingSink<>());
env.executeAsync();
- ParallelismTrackingSource.waitForInstances();
- ParallelismTrackingSink.waitForInstances();
+ FailOnParallelExecutionSource.waitForScaleUpToParallelism(1);
}
/** Test that a job scales up when a TaskManager gets added to the
cluster. */
@Test
public void testScaleUpOnAdditionalTaskManager() throws Exception {
+ ParallelismTrackingSource.resetParallelismTracker();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
final DataStream<String> input = env.addSource(new
ParallelismTrackingSource());
- input.addSink(new ParallelismTrackingSink<>());
-
- ParallelismTrackingSource.expectInstances(
- NUMBER_SLOTS_PER_TASK_MANAGER * INITIAL_NUMBER_TASK_MANAGERS);
- ParallelismTrackingSink.expectInstances(
- NUMBER_SLOTS_PER_TASK_MANAGER * INITIAL_NUMBER_TASK_MANAGERS);
+ input.addSink(new DiscardingSink<>());
env.executeAsync();
- ParallelismTrackingSource.waitForInstances();
- ParallelismTrackingSink.waitForInstances();
-
- // expect scale up to 2 TaskManagers:
-
ParallelismTrackingSource.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER * 2);
- ParallelismTrackingSink.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER
* 2);
+ ParallelismTrackingSource.waitForScaleUpToParallelism(
+ NUMBER_SLOTS_PER_TASK_MANAGER * INITIAL_NUMBER_TASK_MANAGERS);
+ // scale up to 2 TaskManagers:
miniClusterResource.getMiniCluster().startTaskManager();
-
- ParallelismTrackingSource.waitForInstances();
- ParallelismTrackingSink.waitForInstances();
+
ParallelismTrackingSource.waitForScaleUpToParallelism(NUMBER_SLOTS_PER_TASK_MANAGER
* 2);
}
@Test
public void testScaleDownOnTaskManagerLoss() throws Exception {
+ ParallelismTrackingSource.resetParallelismTracker();
// test preparation: ensure we have 2 TaskManagers running
startAdditionalTaskManager();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
-
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE,
0L));
+ // configure exactly one restart to avoid restart loops in error cases
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
final DataStream<String> input = env.addSource(new
ParallelismTrackingSource());
- input.addSink(new ParallelismTrackingSink<>());
-
-
ParallelismTrackingSource.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER * 2);
- ParallelismTrackingSink.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER
* 2);
+ input.addSink(new DiscardingSink<>());
env.executeAsync();
- ParallelismTrackingSource.waitForInstances();
- ParallelismTrackingSink.waitForInstances();
+
ParallelismTrackingSource.waitForScaleUpToParallelism(NUMBER_SLOTS_PER_TASK_MANAGER
* 2);
// scale down to 1 TaskManagers:
-
ParallelismTrackingSource.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER);
- ParallelismTrackingSink.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER);
-
miniClusterResource.getMiniCluster().terminateTaskManager(0).get();
- ParallelismTrackingSource.waitForInstances();
- ParallelismTrackingSink.waitForInstances();
+
ParallelismTrackingSource.waitForScaleUpToParallelism(NUMBER_SLOTS_PER_TASK_MANAGER);
}
private int getNumberOfConnectedTaskManagers() throws ExecutionException,
InterruptedException {
@@ -168,27 +145,41 @@ public class ReactiveModeITCase extends TestLogger {
Deadline.fromNow(Duration.ofMillis(10_000L)));
}
- private static class ParallelismTrackingSource implements
ParallelSourceFunction<String> {
+ /**
+ * This source is tracking its parallelism internally. We can not use a
CountDownLatch with a
+ * predefined parallelism. When scheduling this source on more than one
TaskManager in Reactive
+ * Mode, it can happen that the source gets scheduled once the first
TaskManager registers. In
+ * this execution, the source would count down the latch by one already,
but Reactive Mode would
+ * trigger a restart once the next TaskManager arrives, ultimately
breaking the count of the
+ * latch.
+ *
+ * <p>This approach is a compromise that just tracks the number of running
instances and allows
+ * the test to wait for a parallelism to be reached. To avoid accidentally
reaching the scale
+ * while deallocating source instances, the {@link
InstanceParallelismTracker} is only notifying
+ * the wait method when new instances are added, not when they are removed.
+ */
+ private static class ParallelismTrackingSource extends
RichParallelSourceFunction<String> {
private volatile boolean running = true;
- private static final InstanceTracker instances = new InstanceTracker();
+ private static final InstanceParallelismTracker tracker = new
InstanceParallelismTracker();
- public static void expectInstances(int count) {
- instances.expectInstances(count);
+ public static void waitForScaleUpToParallelism(int parallelism)
+ throws InterruptedException {
+ tracker.waitForScaleUpToParallelism(parallelism);
}
- public static void waitForInstances() throws InterruptedException {
- instances.waitForInstances();
+ public static void resetParallelismTracker() {
+ tracker.reset();
}
@Override
public void run(SourceContext<String> ctx) throws Exception {
- instances.reportNewInstance();
+ tracker.reportNewInstance();
while (running) {
synchronized (ctx.getCheckpointLock()) {
ctx.collect("test");
}
- Thread.sleep(100);
+ Thread.sleep(10);
}
}
@@ -196,52 +187,88 @@ public class ReactiveModeITCase extends TestLogger {
public void cancel() {
running = false;
}
+
+ @Override
+ public void close() throws Exception {
+ tracker.reportStoppedInstance();
+ }
}
- private static class ParallelismTrackingSink<T> extends
RichSinkFunction<T> {
+ private static class InstanceParallelismTracker {
+ // only notify this lock on scale-up
+ private final Object lock = new Object();
- private static final InstanceTracker instances = new InstanceTracker();
+ private int instances = 0;
- public static void expectInstances(int count) {
- instances.expectInstances(count);
+ public void reportStoppedInstance() {
+ synchronized (lock) {
+ instances--;
+ }
}
- public static void waitForInstances() throws InterruptedException {
- instances.waitForInstances();
+ public void reportNewInstance() {
+ synchronized (lock) {
+ instances++;
+ lock.notifyAll();
+ }
}
- @Override
- public void open(Configuration parameters) throws Exception {
- instances.reportNewInstance();
+ public void waitForScaleUpToParallelism(int parallelism) throws
InterruptedException {
+ synchronized (lock) {
+ while (instances != parallelism) {
+ lock.wait();
+ }
+ }
+ }
+
+ public void reset() {
+ synchronized (lock) {
+ instances = 0;
+ }
}
}
- private static class InstanceTracker {
- private final Object lock = new Object();
+ private static class FailOnParallelExecutionSource extends
RichParallelSourceFunction<String> {
+ private volatile boolean running = true;
- @GuardedBy("lock")
- private CountDownLatch latch = new CountDownLatch(0);
+ private static final InstanceParallelismTracker tracker = new
InstanceParallelismTracker();
- public void reportNewInstance() {
- synchronized (lock) {
- if (latch.getCount() == 0) {
- throw new RuntimeException("Test error. More instances
than expected.");
+ public static void waitForScaleUpToParallelism(int parallelism)
+ throws InterruptedException {
+ tracker.waitForScaleUpToParallelism(parallelism);
+ }
+
+ public static void resetParallelismTracker() {
+ tracker.reset();
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ if (getRuntimeContext().getNumberOfParallelSubtasks() > 1) {
+ throw new IllegalStateException(
+ "This is not supposed to be executed in parallel,
despite extending the right base class.");
+ }
+ }
+
+ @Override
+ public void run(SourceContext<String> ctx) throws Exception {
+ tracker.reportNewInstance();
+ while (running) {
+ synchronized (ctx.getCheckpointLock()) {
+ ctx.collect("test");
}
- latch.countDown();
+ Thread.sleep(100);
}
}
- public void waitForInstances() throws InterruptedException {
- //noinspection FieldAccessNotGuarded
- latch.await();
+ @Override
+ public void cancel() {
+ running = false;
}
- public void expectInstances(int count) {
- synchronized (lock) {
- Preconditions.checkState(
- latch.getCount() == 0, "Assuming previous latch has
triggered");
- latch = new CountDownLatch(count);
- }
+ @Override
+ public void close() throws Exception {
+ tracker.reportStoppedInstance();
}
}
}