This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new db91239b92b FLINK-32695-ReactiveModeITCase-NewSource-Migration db91239b92b is described below commit db91239b92b9db6055a799cb61cdb4632b7a0dda Author: Poorvank <poorv...@uber.com> AuthorDate: Fri Jul 25 13:37:26 2025 +0530 FLINK-32695-ReactiveModeITCase-NewSource-Migration --- .../flink/test/scheduling/ReactiveModeITCase.java | 94 ++++++++++------------ 1 file changed, 41 insertions(+), 53 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java b/flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java index dd8865c2509..89bcf9b8dd0 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java @@ -19,11 +19,15 @@ package org.apache.flink.test.scheduling; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.functions.OpenContext; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.SchedulerExecutionMode; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.connector.datagen.source.GeneratorFunction; import org.apache.flink.core.execution.JobClient; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo; @@ -32,8 +36,6 @@ import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; -import org.apache.flink.streaming.api.functions.source.legacy.RichParallelSourceFunction; -import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction; import org.apache.flink.streaming.util.RestartStrategyUtils; import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.TestLogger; @@ -82,7 +84,15 @@ public class ReactiveModeITCase extends TestLogger { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // we set maxParallelism = 1 and assert it never exceeds it final DataStream<String> input = - env.addSource(new FailOnParallelExecutionSource()).setMaxParallelism(1); + env.fromSource( + new DataGeneratorSource<>( + (GeneratorFunction<Long, String>) index -> "test", + Long.MAX_VALUE, + RateLimiterStrategy.perSecond(10), + Types.STRING), + WatermarkStrategy.noWatermarks(), + "fail-on-parallel-source") + .setMaxParallelism(1); input.sinkTo(new DiscardingSink<>()); final JobClient jobClient = env.executeAsync(); @@ -95,7 +105,15 @@ public class ReactiveModeITCase extends TestLogger { @Test public void testScaleUpOnAdditionalTaskManager() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - final DataStream<String> input = env.addSource(new DummySource()); + final DataStream<String> input = + env.fromSource( + new DataGeneratorSource<>( + (GeneratorFunction<Long, String>) index -> "test", + Long.MAX_VALUE, + RateLimiterStrategy.perSecond(100), + Types.STRING), + WatermarkStrategy.noWatermarks(), + "dummy-source"); input.sinkTo(new DiscardingSink<>()); final JobClient jobClient = env.executeAsync(); @@ -117,7 +135,15 @@ public class ReactiveModeITCase extends TestLogger { @Test public void testJsonPlanParallelismAfterRescale() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - final DataStream<String> input = env.addSource(new DummySource()); + final DataStream<String> input = + env.fromSource( + new DataGeneratorSource<>( + (GeneratorFunction<Long, String>) index -> "test", + Long.MAX_VALUE, + RateLimiterStrategy.perSecond(100), + Types.STRING), + WatermarkStrategy.noWatermarks(), + "dummy-source"); input.sinkTo(new DiscardingSink<>()); final JobClient jobClient = env.executeAsync(); @@ -167,7 +193,15 @@ public class ReactiveModeITCase extends TestLogger { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // configure exactly one restart to avoid restart loops in error cases RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L); - final DataStream<String> input = env.addSource(new DummySource()); + final DataStream<String> input = + env.fromSource( + new DataGeneratorSource<>( + (GeneratorFunction<Long, String>) index -> "test", + Long.MAX_VALUE, + RateLimiterStrategy.perSecond(100), + Types.STRING), + WatermarkStrategy.noWatermarks(), + "dummy-source"); input.sinkTo(new DiscardingSink<>()); final JobClient jobClient = env.executeAsync(); @@ -199,52 +233,6 @@ public class ReactiveModeITCase extends TestLogger { CommonTestUtils.waitUntilCondition(() -> getNumberOfConnectedTaskManagers() == 2); } - private static class DummySource implements SourceFunction<String> { - private volatile boolean running = true; - - @Override - public void run(SourceContext<String> ctx) throws Exception { - while (running) { - synchronized (ctx.getCheckpointLock()) { - ctx.collect("test"); - } - Thread.sleep(10); - } - } - - @Override - public void cancel() { - running = false; - } - } - - private static class FailOnParallelExecutionSource extends RichParallelSourceFunction<String> { - private volatile boolean running = true; - - @Override - public void open(OpenContext openContext) throws Exception { - if (getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks() > 1) { - throw new IllegalStateException( - "This is not supposed to be executed in parallel, despite extending the right base class."); - } - } - - @Override - public void run(SourceContext<String> ctx) throws Exception { - while (running) { - synchronized (ctx.getCheckpointLock()) { - ctx.collect("test"); - } - Thread.sleep(100); - } - } - - @Override - public void cancel() { - running = false; - } - } - public static void waitUntilParallelismForVertexReached( RestClusterClient<?> restClusterClient, JobID jobId, int targetParallelism) throws Exception {