tillrohrmann commented on a change in pull request #15417:
URL: https://github.com/apache/flink/pull/15417#discussion_r605641290
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java
##########
@@ -196,52 +192,91 @@ public void run(SourceContext<String> ctx) throws
Exception {
public void cancel() {
running = false;
}
+
+ @Override
+ public void close() throws Exception {
+ tracker.reportStoppedInstance();
+ }
}
- private static class ParallelismTrackingSink<T> extends
RichSinkFunction<T> {
+ private static class InstanceParallelismTracker {
+ // only notify this lock on scale-up
+ private final Object lock = new Object();
- private static final InstanceTracker instances = new InstanceTracker();
+ private volatile int instances = 0;
- public static void expectInstances(int count) {
- instances.expectInstances(count);
+ public void reportStoppedInstance() {
+ synchronized (lock) {
+ instances--;
+ }
}
- public static void waitForInstances() throws InterruptedException {
- instances.waitForInstances();
+ public void reportNewInstance() {
+ synchronized (lock) {
+ instances++;
+ lock.notifyAll();
+ }
}
- @Override
- public void open(Configuration parameters) throws Exception {
- instances.reportNewInstance();
+ public void waitForScaleUpToParallelism(int parallelism) throws
InterruptedException {
+ while (true) {
+ synchronized (lock) {
+ if (instances == parallelism) {
+ break;
+ }
+ lock.wait();
+ }
+ }
+ }
+
+ public void reset() {
+ synchronized (lock) {
+ instances = 0;
+ }
}
}
- private static class InstanceTracker {
- private final Object lock = new Object();
+ private static class FailOnParallelExecutionSource extends
RichParallelSourceFunction<String> {
+ private volatile boolean running = true;
- @GuardedBy("lock")
- private CountDownLatch latch = new CountDownLatch(0);
+ private static final InstanceParallelismTracker tracker = new
InstanceParallelismTracker();
- public void reportNewInstance() {
- synchronized (lock) {
- if (latch.getCount() == 0) {
- throw new RuntimeException("Test error. More instances
than expected.");
+ public static void waitForScaleUpToParallelism(int parallelism)
+ throws InterruptedException {
+ tracker.waitForScaleUpToParallelism(parallelism);
+ }
+
+ public static void resetParallelismTracker() {
+ tracker.reset();
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ if (getRuntimeContext().getNumberOfParallelSubtasks() > 1) {
+ throw new IllegalStateException(
+ "This is not supposed to be executed in parallel,
despite extending the right base class.");
+ }
+ }
+
+ @Override
+ public void run(SourceContext<String> ctx) throws Exception {
+ tracker.reportNewInstance();
+ while (running) {
+ synchronized (ctx.getCheckpointLock()) {
+ ctx.collect("test");
}
- latch.countDown();
+ Thread.sleep(100);
Review comment:
Can this be decreased to `10` or `20`? Otherwise the test will run at
least 100ms.
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java
##########
@@ -86,71 +82,57 @@ public void testScaleLimitByMaxParallelism() throws
Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// we set maxParallelism = 1 and assert it never exceeds it
final DataStream<String> input =
- env.addSource(new
ParallelismTrackingSource()).setMaxParallelism(1);
- input.addSink(new
ParallelismTrackingSink<>()).getTransformation().setMaxParallelism(1);
-
- ParallelismTrackingSource.expectInstances(1);
- ParallelismTrackingSink.expectInstances(1);
+ env.addSource(new
FailOnParallelExecutionSource()).setMaxParallelism(1);
+ input.addSink(new DiscardingSink<>());
env.executeAsync();
- ParallelismTrackingSource.waitForInstances();
- ParallelismTrackingSink.waitForInstances();
+ FailOnParallelExecutionSource.waitForScaleUpToParallelism(1);
}
/** Test that a job scales up when a TaskManager gets added to the
cluster. */
@Test
public void testScaleUpOnAdditionalTaskManager() throws Exception {
+ ParallelismTrackingSource.resetParallelismTracker();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
final DataStream<String> input = env.addSource(new
ParallelismTrackingSource());
- input.addSink(new ParallelismTrackingSink<>());
-
- ParallelismTrackingSource.expectInstances(
- NUMBER_SLOTS_PER_TASK_MANAGER * INITIAL_NUMBER_TASK_MANAGERS);
- ParallelismTrackingSink.expectInstances(
- NUMBER_SLOTS_PER_TASK_MANAGER * INITIAL_NUMBER_TASK_MANAGERS);
+ input.addSink(new DiscardingSink<>());
env.executeAsync();
- ParallelismTrackingSource.waitForInstances();
- ParallelismTrackingSink.waitForInstances();
-
- // expect scale up to 2 TaskManagers:
-
ParallelismTrackingSource.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER * 2);
- ParallelismTrackingSink.expectInstances(NUMBER_SLOTS_PER_TASK_MANAGER
* 2);
+ ParallelismTrackingSource.waitForScaleUpToParallelism(
+ NUMBER_SLOTS_PER_TASK_MANAGER * INITIAL_NUMBER_TASK_MANAGERS);
+ // scale up to 2 TaskManagers:
miniClusterResource.getMiniCluster().startTaskManager();
-
- ParallelismTrackingSource.waitForInstances();
- ParallelismTrackingSink.waitForInstances();
+
ParallelismTrackingSource.waitForScaleUpToParallelism(NUMBER_SLOTS_PER_TASK_MANAGER
* 2);
}
@Test
public void testScaleDownOnTaskManagerLoss() throws Exception {
+ ParallelismTrackingSource.resetParallelismTracker();
// test preparation: ensure we have 2 TaskManagers running
startAdditionalTaskManager();
+ log.info("2 TaskManagers running, submitting job.");
Review comment:
Is this logging statement needed?
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java
##########
@@ -196,52 +192,91 @@ public void run(SourceContext<String> ctx) throws
Exception {
public void cancel() {
running = false;
}
+
+ @Override
+ public void close() throws Exception {
+ tracker.reportStoppedInstance();
+ }
}
- private static class ParallelismTrackingSink<T> extends
RichSinkFunction<T> {
+ private static class InstanceParallelismTracker {
+ // only notify this lock on scale-up
+ private final Object lock = new Object();
- private static final InstanceTracker instances = new InstanceTracker();
+ private volatile int instances = 0;
Review comment:
Why is this field volatile?
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java
##########
@@ -196,52 +192,91 @@ public void run(SourceContext<String> ctx) throws
Exception {
public void cancel() {
running = false;
}
+
+ @Override
+ public void close() throws Exception {
+ tracker.reportStoppedInstance();
+ }
}
- private static class ParallelismTrackingSink<T> extends
RichSinkFunction<T> {
+ private static class InstanceParallelismTracker {
+ // only notify this lock on scale-up
+ private final Object lock = new Object();
- private static final InstanceTracker instances = new InstanceTracker();
+ private volatile int instances = 0;
- public static void expectInstances(int count) {
- instances.expectInstances(count);
+ public void reportStoppedInstance() {
+ synchronized (lock) {
+ instances--;
+ }
}
- public static void waitForInstances() throws InterruptedException {
- instances.waitForInstances();
+ public void reportNewInstance() {
+ synchronized (lock) {
+ instances++;
+ lock.notifyAll();
+ }
}
- @Override
- public void open(Configuration parameters) throws Exception {
- instances.reportNewInstance();
+ public void waitForScaleUpToParallelism(int parallelism) throws
InterruptedException {
+ while (true) {
+ synchronized (lock) {
+ if (instances == parallelism) {
+ break;
+ }
+ lock.wait();
+ }
+ }
Review comment:
If I am not mistaken, then you would write the following loop:
```
synchronized (lock) {
while (instances != parallelism) {
lock.wait();
}
}
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]