This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new db91239b92b FLINK-32695-ReactiveModeITCase-NewSource-Migration
db91239b92b is described below

commit db91239b92b9db6055a799cb61cdb4632b7a0dda
Author: Poorvank <poorv...@uber.com>
AuthorDate: Fri Jul 25 13:37:26 2025 +0530

    FLINK-32695-ReactiveModeITCase-NewSource-Migration
---
 .../flink/test/scheduling/ReactiveModeITCase.java  | 94 ++++++++++------------
 1 file changed, 41 insertions(+), 53 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java
index dd8865c2509..89bcf9b8dd0 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java
@@ -19,11 +19,15 @@
 package org.apache.flink.test.scheduling;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.functions.OpenContext;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.Types;
+import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.SchedulerExecutionMode;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
@@ -32,8 +36,6 @@ import 
org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
-import 
org.apache.flink.streaming.api.functions.source.legacy.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
 import org.apache.flink.streaming.util.RestartStrategyUtils;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.TestLogger;
@@ -82,7 +84,15 @@ public class ReactiveModeITCase extends TestLogger {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         // we set maxParallelism = 1 and assert it never exceeds it
         final DataStream<String> input =
-                env.addSource(new 
FailOnParallelExecutionSource()).setMaxParallelism(1);
+                env.fromSource(
+                                new DataGeneratorSource<>(
+                                        (GeneratorFunction<Long, String>) 
index -> "test",
+                                        Long.MAX_VALUE,
+                                        RateLimiterStrategy.perSecond(10),
+                                        Types.STRING),
+                                WatermarkStrategy.noWatermarks(),
+                                "fail-on-parallel-source")
+                        .setMaxParallelism(1);
         input.sinkTo(new DiscardingSink<>());
 
         final JobClient jobClient = env.executeAsync();
@@ -95,7 +105,15 @@ public class ReactiveModeITCase extends TestLogger {
     @Test
     public void testScaleUpOnAdditionalTaskManager() throws Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        final DataStream<String> input = env.addSource(new DummySource());
+        final DataStream<String> input =
+                env.fromSource(
+                        new DataGeneratorSource<>(
+                                (GeneratorFunction<Long, String>) index -> 
"test",
+                                Long.MAX_VALUE,
+                                RateLimiterStrategy.perSecond(100),
+                                Types.STRING),
+                        WatermarkStrategy.noWatermarks(),
+                        "dummy-source");
         input.sinkTo(new DiscardingSink<>());
 
         final JobClient jobClient = env.executeAsync();
@@ -117,7 +135,15 @@ public class ReactiveModeITCase extends TestLogger {
     @Test
     public void testJsonPlanParallelismAfterRescale() throws Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        final DataStream<String> input = env.addSource(new DummySource());
+        final DataStream<String> input =
+                env.fromSource(
+                        new DataGeneratorSource<>(
+                                (GeneratorFunction<Long, String>) index -> 
"test",
+                                Long.MAX_VALUE,
+                                RateLimiterStrategy.perSecond(100),
+                                Types.STRING),
+                        WatermarkStrategy.noWatermarks(),
+                        "dummy-source");
         input.sinkTo(new DiscardingSink<>());
 
         final JobClient jobClient = env.executeAsync();
@@ -167,7 +193,15 @@ public class ReactiveModeITCase extends TestLogger {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         // configure exactly one restart to avoid restart loops in error cases
         RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L);
-        final DataStream<String> input = env.addSource(new DummySource());
+        final DataStream<String> input =
+                env.fromSource(
+                        new DataGeneratorSource<>(
+                                (GeneratorFunction<Long, String>) index -> 
"test",
+                                Long.MAX_VALUE,
+                                RateLimiterStrategy.perSecond(100),
+                                Types.STRING),
+                        WatermarkStrategy.noWatermarks(),
+                        "dummy-source");
         input.sinkTo(new DiscardingSink<>());
 
         final JobClient jobClient = env.executeAsync();
@@ -199,52 +233,6 @@ public class ReactiveModeITCase extends TestLogger {
         CommonTestUtils.waitUntilCondition(() -> 
getNumberOfConnectedTaskManagers() == 2);
     }
 
-    private static class DummySource implements SourceFunction<String> {
-        private volatile boolean running = true;
-
-        @Override
-        public void run(SourceContext<String> ctx) throws Exception {
-            while (running) {
-                synchronized (ctx.getCheckpointLock()) {
-                    ctx.collect("test");
-                }
-                Thread.sleep(10);
-            }
-        }
-
-        @Override
-        public void cancel() {
-            running = false;
-        }
-    }
-
-    private static class FailOnParallelExecutionSource extends 
RichParallelSourceFunction<String> {
-        private volatile boolean running = true;
-
-        @Override
-        public void open(OpenContext openContext) throws Exception {
-            if 
(getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks() > 1) {
-                throw new IllegalStateException(
-                        "This is not supposed to be executed in parallel, 
despite extending the right base class.");
-            }
-        }
-
-        @Override
-        public void run(SourceContext<String> ctx) throws Exception {
-            while (running) {
-                synchronized (ctx.getCheckpointLock()) {
-                    ctx.collect("test");
-                }
-                Thread.sleep(100);
-            }
-        }
-
-        @Override
-        public void cancel() {
-            running = false;
-        }
-    }
-
     public static void waitUntilParallelismForVertexReached(
             RestClusterClient<?> restClusterClient, JobID jobId, int 
targetParallelism)
             throws Exception {

Reply via email to