This is an automated email from the ASF dual-hosted git repository.

afedulov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 48359a32 [FLINK-33525] Move ImpulseSource to new Source API (#950)
48359a32 is described below

commit 48359a326f417081bcfad984cbdab578ef9c906d
Author: PB <[email protected]>
AuthorDate: Mon Mar 10 22:57:03 2025 +0530

    [FLINK-33525] Move ImpulseSource to new Source API (#950)
---
 examples/autoscaling/pom.xml                       |  7 +++
 .../java/autoscaling/LoadSimulationPipeline.java   | 69 +++++++++++++---------
 2 files changed, 49 insertions(+), 27 deletions(-)

diff --git a/examples/autoscaling/pom.xml b/examples/autoscaling/pom.xml
index 6da06a85..993b22dd 100644
--- a/examples/autoscaling/pom.xml
+++ b/examples/autoscaling/pom.xml
@@ -45,6 +45,13 @@ under the License.
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-datagen</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-clients</artifactId>
diff --git 
a/examples/autoscaling/src/main/java/autoscaling/LoadSimulationPipeline.java 
b/examples/autoscaling/src/main/java/autoscaling/LoadSimulationPipeline.java
index 0ddb2302..14865e1b 100644
--- a/examples/autoscaling/src/main/java/autoscaling/LoadSimulationPipeline.java
+++ b/examples/autoscaling/src/main/java/autoscaling/LoadSimulationPipeline.java
@@ -18,12 +18,16 @@
 
 package autoscaling;
 
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.typeinfo.Types;
+import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.util.Collector;
 
 import org.slf4j.Logger;
@@ -60,6 +64,11 @@ public class LoadSimulationPipeline {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(LoadSimulationPipeline.class);
 
+    // Number of impulses (records) emitted per sampling interval.
+    // This value determines how many records should be generated within each 
`samplingIntervalMs`
+    // period.
+    private static final int IMPULSES_PER_SAMPLING_INTERVAL = 10;
+
     public static void main(String[] args) throws Exception {
         var env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.disableOperatorChaining();
@@ -74,8 +83,39 @@ public class LoadSimulationPipeline {
         for (String branch : maxLoadPerTask.split("\n")) {
             String[] taskLoads = branch.split(";");
 
+            /*
+             * Creates an unbounded stream that continuously emits the 
constant value 42L.
+             * Flink's DataGeneratorSource with RateLimiterStrategy is used to 
control the emission rate.
+             *
+             * Emission Rate Logic:
+             * - The goal is to generate a fixed number of impulses per 
sampling interval.
+             * - `samplingIntervalMs` defines the duration of one sampling 
interval in milliseconds.
+             * - We define `IMPULSES_PER_SAMPLING_INTERVAL = 10`, meaning that 
for every sampling interval,
+             *   exactly 10 impulses should be generated.
+             *
+             * To calculate the total number of records emitted per second:
+             * 1. Determine how many sampling intervals fit within one second:
+             *      samplingIntervalsPerSecond = 1000 / samplingIntervalMs;
+             * 2. Multiply this by the number of impulses per interval to get 
the total rate:
+             *      impulsesPerSecond = IMPULSES_PER_SAMPLING_INTERVAL * 
samplingIntervalsPerSecond;
+             *
+             * Example:
+             * - If `samplingIntervalMs = 500 ms` and 
`IMPULSES_PER_SAMPLING_INTERVAL = 10`:
+             *      impulsesPerSecond = (1000 / 500) * 10 = 2 * 10 = 20 
records per second.
+             */
             DataStream<Long> stream =
-                    env.addSource(new 
ImpulseSource(samplingIntervalMs)).name("ImpulseSource");
+                    env.fromSource(
+                            new DataGeneratorSource<>(
+                                    (GeneratorFunction<Long, Long>)
+                                            (index) -> 42L, // Emits constant 
value 42
+                                    Long.MAX_VALUE, // Unbounded stream
+                                    RateLimiterStrategy.perSecond(
+                                            (1000.0 / samplingIntervalMs)
+                                                    * 
IMPULSES_PER_SAMPLING_INTERVAL), // Controls
+                                    // rate
+                                    Types.LONG),
+                            WatermarkStrategy.noWatermarks(),
+                            "ImpulseSource");
 
             for (String load : taskLoads) {
                 double maxLoad = Double.parseDouble(load);
@@ -97,31 +137,6 @@ public class LoadSimulationPipeline {
                         + ")");
     }
 
-    private static class ImpulseSource implements SourceFunction<Long> {
-        private final int maxSleepTimeMs;
-        volatile boolean canceled;
-
-        public ImpulseSource(int samplingInterval) {
-            this.maxSleepTimeMs = samplingInterval / 10;
-        }
-
-        @Override
-        public void run(SourceContext<Long> sourceContext) throws Exception {
-            while (!canceled) {
-                synchronized (sourceContext.getCheckpointLock()) {
-                    sourceContext.collect(42L);
-                }
-                // Provide an impulse to keep the load simulation active
-                Thread.sleep(maxSleepTimeMs);
-            }
-        }
-
-        @Override
-        public void cancel() {
-            canceled = true;
-        }
-    }
-
     private static class LoadSimulationFn extends RichFlatMapFunction<Long, 
Long> {
 
         private final double maxLoad;

Reply via email to